checkpatch fixes
[cor.git] / net / cor / conn_trgt_out.c
blob04f995a5bbd35a226854a10b0c974f51da1b7fb8
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/gfp.h>
17 #include <linux/jiffies.h>
18 #include <linux/slab.h>
20 #include "cor.h"
22 static struct kmem_cache *cor_connretrans_slab;
25 void cor_free_connretrans(struct kref *ref)
27 struct cor_conn_retrans *cr = container_of(ref, struct cor_conn_retrans,
28 ref);
29 struct cor_conn *cn = cr->trgt_out_o;
31 BUG_ON(cr->state != CONN_RETRANS_ACKED);
33 kmem_cache_free(cor_connretrans_slab, cr);
34 cor_conn_kref_put(cn, "conn_retrans");
37 static struct cor_conn_retrans *cor_peek_next_conn_retrans(
38 struct cor_neighbor *nb_retransconnlocked)
40 struct cor_conn_retrans *cr1 = 0;
41 struct cor_conn_retrans *cr2 = 0;
43 if (list_empty(&(nb_retransconnlocked->retrans_conn_lowlatency_list)) == 0)
44 cr1 = container_of(
45 nb_retransconnlocked->retrans_conn_lowlatency_list.next,
46 struct cor_conn_retrans, timeout_list);
48 if (list_empty(&(nb_retransconnlocked->retrans_conn_highlatency_list)) == 0)
49 cr2 = container_of(
50 nb_retransconnlocked->retrans_conn_highlatency_list.next,
51 struct cor_conn_retrans, timeout_list);
53 if (cr1 == 0)
54 return cr2;
55 if (cr2 == 0)
56 return cr1;
58 if (time_before_eq(cr1->timeout, jiffies))
59 return cr1;
61 if (time_before_eq(cr1->timeout, cr2->timeout))
62 return cr1;
64 return cr2;
67 void cor_reschedule_conn_retrans_timer(
68 struct cor_neighbor *nb_retransconnlocked)
70 struct cor_conn_retrans *cr =
71 cor_peek_next_conn_retrans(nb_retransconnlocked);
73 if (cr == 0)
74 return;
76 if (time_before_eq(cr->timeout, jiffies)) {
77 cor_qos_enqueue(nb_retransconnlocked->queue,
78 &(nb_retransconnlocked->rb_cr), 0,
79 ns_to_ktime(0), QOS_CALLER_CONN_RETRANS, 0);
80 } else {
81 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
82 cr->timeout) == 0) {
83 cor_nb_kref_get(nb_retransconnlocked,
84 "retrans_conn_timer");
89 /**
90 * caller must also call kref_get/put, see cor_reschedule_conn_retrans_timer
92 static void cor_cancel_conn_retrans(struct cor_neighbor *nb_retransconnlocked,
93 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr,
94 __u64 *bytes_acked)
96 if (unlikely(cr->state == CONN_RETRANS_ACKED))
97 return;
99 if (cr->state == CONN_RETRANS_SCHEDULED) {
100 list_del(&(cr->timeout_list));
101 kref_put(&(cr->ref), cor_kreffree_bug);
102 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
103 BUG_ON(trgt_out_lx->trgt.out.retrans_lowwindow == 0);
104 if (likely(trgt_out_lx->trgt.out.retrans_lowwindow != 65535))
105 trgt_out_lx->trgt.out.retrans_lowwindow--;
108 if (cr->state != CONN_RETRANS_INITIAL)
109 *bytes_acked += cr->length;
111 list_del(&(cr->conn_list));
112 cr->state = CONN_RETRANS_ACKED;
114 kref_put(&(cr->ref), cor_free_connretrans); /* conn_list */
118 * nb->retrans_conn_lock must be held when calling this
119 * (see cor_schedule_retransmit_conn())
121 static void cor_cancel_acked_conn_retrans(struct cor_conn *trgt_out_l,
122 __u64 *bytes_acked)
124 __u64 seqno_acked = trgt_out_l->trgt.out.seqno_acked;
126 while (list_empty(&(trgt_out_l->trgt.out.retrans_list)) == 0) {
127 struct cor_conn_retrans *cr = container_of(
128 trgt_out_l->trgt.out.retrans_list.next,
129 struct cor_conn_retrans, conn_list);
131 if (cor_seqno_after(cr->seqno + cr->length, seqno_acked)) {
132 if (cor_seqno_before(cr->seqno, seqno_acked)) {
133 *bytes_acked += cor_seqno_clean(seqno_acked -
134 cr->seqno);
135 cr->length -= cor_seqno_clean(seqno_acked -
136 cr->seqno);
137 cr->seqno = seqno_acked;
139 break;
142 cor_cancel_conn_retrans(trgt_out_l->trgt.out.nb, trgt_out_l, cr,
143 bytes_acked);
146 cor_reschedule_conn_retrans_timer(trgt_out_l->trgt.out.nb);
149 void cor_cancel_all_conn_retrans(struct cor_conn *trgt_out_lx)
151 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
152 __u64 bytes_acked = 0;
154 spin_lock_bh(&(nb->retrans_conn_lock));
156 while (list_empty(&(trgt_out_lx->trgt.out.retrans_list)) == 0) {
157 struct cor_conn_retrans *cr = container_of(
158 trgt_out_lx->trgt.out.retrans_list.next,
159 struct cor_conn_retrans, conn_list);
160 BUG_ON(cr->trgt_out_o != trgt_out_lx);
162 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
165 cor_reschedule_conn_retrans_timer(nb);
167 spin_unlock_bh(&(nb->retrans_conn_lock));
169 if (bytes_acked > 0)
170 cor_nbcongwin_data_acked(nb, bytes_acked);
173 static void _cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb,
174 struct list_head *retrans_list, __u64 *bytes_acked)
176 while (1) {
177 struct cor_conn_retrans *cr;
179 spin_lock_bh(&(nb->retrans_conn_lock));
181 if (list_empty(retrans_list)) {
182 spin_unlock_bh(&(nb->retrans_conn_lock));
183 break;
186 cr = container_of(retrans_list->next, struct cor_conn_retrans,
187 timeout_list);
189 kref_get(&(cr->ref));
191 spin_unlock_bh(&(nb->retrans_conn_lock));
194 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
195 spin_lock_bh(&(nb->retrans_conn_lock));
197 if (likely(cr == container_of(retrans_list->next,
198 struct cor_conn_retrans, timeout_list)))
199 cor_cancel_conn_retrans(nb, cr->trgt_out_o, cr,
200 bytes_acked);
202 spin_unlock_bh(&(nb->retrans_conn_lock));
203 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
205 kref_put(&(cr->ref), cor_free_connretrans);
209 static void cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb)
211 __u64 bytes_acked = 0;
213 _cor_cancel_all_conn_retrans_nb(nb,
214 &(nb->retrans_conn_lowlatency_list), &bytes_acked);
215 _cor_cancel_all_conn_retrans_nb(nb,
216 &(nb->retrans_conn_highlatency_list), &bytes_acked);
218 if (bytes_acked > 0)
219 cor_nbcongwin_data_acked(nb, bytes_acked);
222 static struct cor_conn_retrans *cor_prepare_conn_retrans(
223 struct cor_conn *trgt_out_l, __u64 seqno, __u32 len,
224 __u8 windowused, struct cor_conn_retrans *cr_splitted,
225 int retransconnlocked)
227 struct cor_neighbor *nb = trgt_out_l->trgt.out.nb;
229 struct cor_conn_retrans *cr = kmem_cache_alloc(cor_connretrans_slab,
230 GFP_ATOMIC);
232 if (unlikely(cr == 0))
233 return 0;
235 BUG_ON(trgt_out_l->isreset != 0);
237 memset(cr, 0, sizeof (struct cor_conn_retrans));
238 cr->trgt_out_o = trgt_out_l;
239 cor_conn_kref_get(trgt_out_l, "conn_retrans");
240 cr->seqno = seqno;
241 cr->length = len;
242 cr->windowused = windowused;
243 kref_init(&(cr->ref));
245 if (retransconnlocked == 0)
246 spin_lock_bh(&(nb->retrans_conn_lock));
248 if (cr_splitted != 0)
249 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
250 else
251 list_add_tail(&(cr->conn_list),
252 &(cr->trgt_out_o->trgt.out.retrans_list));
253 kref_get(&(cr->ref)); /* conn_list */
255 if (retransconnlocked == 0)
256 spin_unlock_bh(&(nb->retrans_conn_lock));
258 return cr;
261 #define RC_SENDRETRANS_OK 0
262 #define RC_SENDRETRANS_OOM 1
263 #define RC_SENDRETRANS_QUEUEFULL 2
264 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
266 static int __cor_send_retrans(struct cor_neighbor *nb,
267 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr,
268 __u64 *bytes_sent)
270 __u8 flush = 0;
272 BUG_ON(cr->length == 0);
274 if (trgt_out_l->flush != 0 && cor_seqno_eq(cr->seqno + cr->length,
275 trgt_out_l->trgt.out.seqno_nextsend) &&
276 trgt_out_l->data_buf.read_remaining == 0)
277 flush = 1;
279 if (cor_send_conndata_as_skb(nb, cr->length)) {
280 struct sk_buff *skb;
281 char *dst;
282 int rc;
284 skb = cor_create_packet_conndata(nb, cr->length, GFP_ATOMIC,
285 trgt_out_l->trgt.out.conn_id, cr->seqno,
286 cr->windowused, flush);
287 if (unlikely(skb == 0))
288 return RC_SENDRETRANS_OOM;
290 dst = skb_put(skb, cr->length);
292 cor_databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
294 rc = cor_dev_queue_xmit(skb, nb->queue,
295 QOS_CALLER_CONN_RETRANS);
296 if (rc == NET_XMIT_DROP)
297 return RC_SENDRETRANS_QUEUEFULLDROPPED;
298 cor_schedule_retransmit_conn(cr, 1, 0);
299 if (rc != NET_XMIT_SUCCESS)
300 return RC_SENDRETRANS_QUEUEFULL;
302 } else {
303 struct cor_control_msg_out *cm;
304 char *buf;
306 buf = kmalloc(cr->length, GFP_ATOMIC);
307 if (unlikely(buf == 0))
308 return RC_SENDRETRANS_OOM;
310 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_LOW);
311 if (unlikely(cm == 0)) {
312 kfree(buf);
313 return RC_SENDRETRANS_OOM;
316 cor_databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
318 cor_send_conndata(cm, trgt_out_l->trgt.out.conn_id, cr->seqno,
319 buf, buf, cr->length, cr->windowused,
320 flush, trgt_out_l->is_highlatency, cr);
323 *bytes_sent += cr->length;
325 return RC_SENDRETRANS_OK;
328 static int _cor_send_retrans_splitcr_ifneeded(
329 struct cor_neighbor *nb_retransconnlocked,
330 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr)
332 __u32 targetmss = cor_mss_conndata(nb_retransconnlocked,
333 trgt_out_l->is_highlatency != 0);
334 __u64 windowlimit = cor_seqno_clean(
335 trgt_out_l->trgt.out.seqno_windowlimit -
336 cr->seqno);
337 __u32 maxsize = targetmss;
338 if (windowlimit < maxsize)
339 maxsize = windowlimit;
341 if (unlikely(cr->length > maxsize)) {
342 struct cor_conn_retrans *cr2 = cor_prepare_conn_retrans(
343 trgt_out_l, cr->seqno + maxsize,
344 cr->length - maxsize, cr->windowused, cr, 1);
345 if (unlikely(cr2 == 0))
346 return RC_SENDRETRANS_OOM;
348 cr2->timeout = cr->timeout;
350 if (trgt_out_l->is_highlatency)
351 list_add(&(cr2->timeout_list),
352 &(nb_retransconnlocked->retrans_conn_highlatency_list));
353 else
354 list_add(&(cr2->timeout_list),
355 &(nb_retransconnlocked->retrans_conn_lowlatency_list));
356 /* cr2: kref from alloc goes to kref for timeout_list */
357 cr2->state = CONN_RETRANS_SCHEDULED;
359 cr->length = maxsize;
362 return RC_SENDRETRANS_OK;
365 static int _cor_send_retrans(struct cor_neighbor *nb,
366 struct cor_conn_retrans *cr, __u64 *bytes_sent)
369 struct cor_conn *trgt_out_o = cr->trgt_out_o;
370 int rc;
372 spin_lock_bh(&(trgt_out_o->rcv_lock));
374 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
375 BUG_ON(trgt_out_o->trgt.out.nb != nb);
377 spin_lock_bh(&(nb->retrans_conn_lock));
378 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
379 spin_unlock_bh(&(nb->retrans_conn_lock));
380 spin_unlock_bh(&(trgt_out_o->rcv_lock));
381 return 0;
384 BUG_ON(trgt_out_o->isreset != 0);
386 BUG_ON(cor_seqno_before(cr->seqno, trgt_out_o->trgt.out.seqno_acked));
388 if (cor_seqno_after_eq(cr->seqno,
389 trgt_out_o->trgt.out.seqno_windowlimit)) {
390 BUG_ON(cr->state != CONN_RETRANS_SENDING);
391 cr->state = CONN_RETRANS_LOWWINDOW;
392 if (likely(trgt_out_o->trgt.out.retrans_lowwindow != 65535))
393 trgt_out_o->trgt.out.retrans_lowwindow++;
395 spin_unlock_bh(&(nb->retrans_conn_lock));
396 spin_unlock_bh(&(trgt_out_o->rcv_lock));
397 return 0;
400 rc = _cor_send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
402 spin_unlock_bh(&(nb->retrans_conn_lock));
404 cor_conn_kref_get(trgt_out_o, "stack");
406 if (rc == RC_SENDRETRANS_OK)
407 rc = __cor_send_retrans(nb, trgt_out_o, cr, bytes_sent);
409 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
410 spin_lock_bh(&(nb->retrans_conn_lock));
411 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
412 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
413 if (rc == RC_SENDRETRANS_OOM)
414 cr->timeout = jiffies + 1;
415 if (trgt_out_o->is_highlatency)
416 list_add(&(cr->timeout_list),
417 &(nb->retrans_conn_highlatency_list));
418 else
419 list_add(&(cr->timeout_list),
420 &(nb->retrans_conn_lowlatency_list));
421 kref_get(&(cr->ref));
422 cr->state = CONN_RETRANS_SCHEDULED;
423 } else {
424 BUG();
426 spin_unlock_bh(&(nb->retrans_conn_lock));
429 spin_unlock_bh(&(trgt_out_o->rcv_lock));
431 cor_conn_kref_put(trgt_out_o, "stack");
433 return (rc == RC_SENDRETRANS_OOM ||
434 rc == RC_SENDRETRANS_QUEUEFULL ||
435 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
438 int cor_send_retrans(struct cor_neighbor *nb, int *sent)
440 int queuefull = 0;
441 int nbstate = cor_get_neigh_state(nb);
442 __u64 bytes_sent = 0;
444 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
445 return QOS_RESUME_DONE;
446 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
448 * cor_cancel_all_conn_retrans_nb should not be needed, because
449 * cor_reset_all_conns calls cor_cancel_all_conn_retrans
451 cor_cancel_all_conn_retrans_nb(nb);
452 return QOS_RESUME_DONE;
455 while (1) {
456 struct cor_conn_retrans *cr = 0;
458 spin_lock_bh(&(nb->retrans_conn_lock));
460 cr = cor_peek_next_conn_retrans(nb);
461 if (cr == 0) {
462 spin_unlock_bh(&(nb->retrans_conn_lock));
463 break;
466 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
468 if (time_after(cr->timeout, jiffies)) {
469 cor_reschedule_conn_retrans_timer(nb);
470 spin_unlock_bh(&(nb->retrans_conn_lock));
471 break;
474 list_del(&(cr->timeout_list));
475 cr->state = CONN_RETRANS_SENDING;
477 spin_unlock_bh(&(nb->retrans_conn_lock));
479 queuefull = _cor_send_retrans(nb, cr, &bytes_sent);
480 kref_put(&(cr->ref), cor_free_connretrans); /* timeout_list */
481 if (queuefull) {
482 break;
483 } else {
484 *sent = 1;
488 if (bytes_sent > 0)
489 cor_nbcongwin_data_retransmitted(nb, bytes_sent);
491 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
494 void cor_retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
496 struct cor_neighbor *nb = container_of(retrans_conn_timer,
497 struct cor_neighbor, retrans_conn_timer);
498 cor_qos_enqueue(nb->queue, &(nb->rb_cr), 0, ns_to_ktime(0),
499 QOS_CALLER_CONN_RETRANS, 0);
500 cor_nb_kref_put(nb, "retransmit_conn_timer");
503 static void cor_conn_ack_ooo_rcvd_splitcr(struct cor_conn *trgt_out_l,
504 struct cor_conn_retrans *cr, __u64 seqno_ooo, __u32 length,
505 __u64 *bytes_acked)
507 struct cor_conn_retrans *cr2;
508 __u64 seqno_cr2start;
509 __u32 oldcrlenght = cr->length;
511 if (cr->state != CONN_RETRANS_SCHEDULED &&
512 cr->state != CONN_RETRANS_LOWWINDOW)
513 return;
515 seqno_cr2start = seqno_ooo+length;
516 cr2 = cor_prepare_conn_retrans(trgt_out_l, seqno_cr2start,
517 cor_seqno_clean(cr->seqno + cr->length -
518 seqno_cr2start),
519 cr->windowused, cr, 1);
521 if (unlikely(cr2 == 0))
522 return;
524 BUG_ON(cr2->length > cr->length);
526 cr2->timeout = cr->timeout;
527 cr2->state = cr->state;
529 if (cr->state != CONN_RETRANS_SCHEDULED) {
530 list_add(&(cr2->timeout_list), &(cr->timeout_list));
531 kref_get(&(cr2->ref));
534 BUG_ON(cor_seqno_clean(seqno_ooo - cr->seqno) > cr->length);
536 cr->length -= cor_seqno_clean(seqno_ooo - cr->seqno);
537 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
538 kref_put(&(cr2->ref), cor_kreffree_bug); /* alloc */
540 *bytes_acked += length;
543 void cor_conn_ack_ooo_rcvd(struct cor_neighbor *nb, __u32 conn_id,
544 struct cor_conn *trgt_out, __u64 seqno_ooo, __u32 length,
545 __u64 *bytes_acked)
547 struct list_head *curr;
549 if (unlikely(length == 0))
550 return;
552 spin_lock_bh(&(trgt_out->rcv_lock));
554 if (unlikely(trgt_out->targettype != TARGET_OUT))
555 goto out;
556 if (unlikely(trgt_out->trgt.out.nb != nb))
557 goto out;
558 if (unlikely(trgt_out->trgt.out.conn_id != conn_id))
559 goto out;
561 cor_nb_kref_get(nb, "stack");
562 spin_lock_bh(&(nb->retrans_conn_lock));
564 curr = trgt_out->trgt.out.retrans_list.next;
565 while (curr != &(trgt_out->trgt.out.retrans_list)) {
566 struct cor_conn_retrans *cr = container_of(curr,
567 struct cor_conn_retrans, conn_list);
569 int ack_covers_start = cor_seqno_after_eq(cr->seqno, seqno_ooo);
570 int ack_covers_end = cor_seqno_before_eq(cr->seqno + cr->length,
571 seqno_ooo + length);
573 curr = curr->next;
575 if (cor_seqno_before(cr->seqno + cr->length, seqno_ooo))
576 continue;
578 if (cor_seqno_after(cr->seqno, seqno_ooo + length))
579 break;
581 if (likely(ack_covers_start && ack_covers_end)) {
582 cor_cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
583 cor_reschedule_conn_retrans_timer(nb);
584 } else if (ack_covers_start) {
585 __u32 diff = seqno_ooo + length - cr->seqno -
586 cr->length;
587 BUG_ON(diff >= cr->length);
588 cr->seqno += diff;
589 cr->length -= diff;
590 *bytes_acked =+ diff;
591 } else if (ack_covers_end) {
592 __u32 diff = seqno_ooo + length - cr->seqno;
593 BUG_ON(diff >= length);
594 cr->length -= diff;
595 *bytes_acked += diff;
596 } else {
597 cor_conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
598 length, bytes_acked);
599 break;
603 if (unlikely(list_empty(&(trgt_out->trgt.out.retrans_list)) == 0)) {
604 trgt_out->trgt.out.seqno_acked =
605 trgt_out->trgt.out.seqno_nextsend;
606 } else {
607 struct cor_conn_retrans *cr = container_of(
608 trgt_out->trgt.out.retrans_list.next,
609 struct cor_conn_retrans, conn_list);
610 if (cor_seqno_after(cr->seqno,
611 trgt_out->trgt.out.seqno_acked))
612 trgt_out->trgt.out.seqno_acked = cr->seqno;
615 spin_unlock_bh(&(nb->retrans_conn_lock));
616 cor_nb_kref_put(nb, "stack");
618 out:
619 spin_unlock_bh(&(trgt_out->rcv_lock));
622 static void _cor_conn_ack_rcvd_nosendwin(struct cor_conn *trgt_out_l)
624 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
625 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
626 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
628 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
629 trgt_out_l->bufsize.act.noact.bytesleft = max(
630 trgt_out_l->bufsize.act.noact.bytesleft,
631 (__u32) BUF_OUT_WIN_NOK_NOINCR);
633 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
634 trgt_out_l->bufsize.ignore_rcv_lowbuf,
635 (__u32) BUF_OUT_WIN_NOK_NOINCR);
639 * nb->retrans_conn_lock must be held when calling this
640 * (see cor_schedule_retransmit_conn())
642 static void cor_reschedule_lowwindow_retrans(struct cor_conn *trgt_out_l)
644 struct list_head *lh = trgt_out_l->trgt.out.retrans_list.next;
645 int cnt = 0;
647 while (trgt_out_l->trgt.out.retrans_lowwindow > 0 && cnt < 100) {
648 struct cor_conn_retrans *cr;
650 if (unlikely(lh == &(trgt_out_l->trgt.out.retrans_list))) {
651 BUG_ON(trgt_out_l->trgt.out.retrans_lowwindow != 65535);
652 trgt_out_l->trgt.out.retrans_lowwindow = 0;
653 break;
656 cr = container_of(lh, struct cor_conn_retrans, conn_list);
658 if (cor_seqno_after_eq(cr->seqno,
659 trgt_out_l->trgt.out.seqno_windowlimit)) {
660 break;
663 if (cr->state == CONN_RETRANS_LOWWINDOW)
664 cor_schedule_retransmit_conn(cr, 1, 1);
666 lh = lh->next;
667 cnt++;
671 void cor_conn_ack_rcvd(struct cor_neighbor *nb, __u32 conn_id,
672 struct cor_conn *trgt_out, __u64 seqno, int setwindow,
673 __u16 window, __u8 bufsize_changerate, __u64 *bytes_acked)
675 int seqno_advanced = 0;
676 int window_enlarged = 0;
678 spin_lock_bh(&(trgt_out->rcv_lock));
680 if (unlikely(trgt_out->isreset != 0))
681 goto out;
682 if (unlikely(trgt_out->targettype != TARGET_OUT))
683 goto out;
684 if (unlikely(trgt_out->trgt.out.nb != nb))
685 goto out;
686 if (unlikely(cor_get_connid_reverse(trgt_out->trgt.out.conn_id) !=
687 conn_id))
688 goto out;
690 if (unlikely(cor_seqno_after(seqno,
691 trgt_out->trgt.out.seqno_nextsend) ||
692 cor_seqno_before(seqno,
693 trgt_out->trgt.out.seqno_acked)))
694 goto out;
696 if (setwindow) {
697 __u64 windowdec = cor_dec_window(window);
698 if (likely(cor_seqno_after(seqno,
699 trgt_out->trgt.out.seqno_acked)) ||
700 cor_seqno_after(seqno + windowdec,
701 trgt_out->trgt.out.seqno_windowlimit)) {
702 trgt_out->trgt.out.seqno_windowlimit = seqno +
703 windowdec;
704 window_enlarged = 1;
706 trgt_out->trgt.out.remote_bufsize_changerate =
707 bufsize_changerate;
710 if (cor_seqno_after(seqno, trgt_out->trgt.out.seqno_acked))
711 seqno_advanced = 1;
713 if (seqno_advanced == 0 && window_enlarged == 0)
714 goto out;
716 cor_nb_kref_get(nb, "stack");
717 spin_lock_bh(&(nb->retrans_conn_lock));
719 if (seqno_advanced) {
720 trgt_out->trgt.out.seqno_acked = seqno;
721 cor_cancel_acked_conn_retrans(trgt_out, bytes_acked);
724 if (window_enlarged)
725 cor_reschedule_lowwindow_retrans(trgt_out);
727 spin_unlock_bh(&(nb->retrans_conn_lock));
728 cor_nb_kref_put(nb, "stack");
730 if (seqno_advanced)
731 cor_databuf_ack(trgt_out, trgt_out->trgt.out.seqno_acked);
733 if (cor_seqno_eq(trgt_out->trgt.out.seqno_acked,
734 trgt_out->trgt.out.seqno_nextsend))
735 _cor_conn_ack_rcvd_nosendwin(trgt_out);
737 out:
738 if (seqno_advanced || window_enlarged)
739 cor_flush_buf(trgt_out);
741 spin_unlock_bh(&(trgt_out->rcv_lock));
743 cor_wake_sender(trgt_out);
746 static void cor_try_combine_conn_retrans_prev(
747 struct cor_neighbor *nb_retransconnlocked,
748 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
750 struct cor_conn_retrans *cr_prev;
751 __u64 bytes_dummyacked = 0;
753 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
755 if (cr->conn_list.prev == &(trgt_out_lx->trgt.out.retrans_list))
756 return;
758 cr_prev = container_of(cr->conn_list.prev, struct cor_conn_retrans,
759 conn_list);
761 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
762 return;
763 if (cr_prev->timeout != cr->timeout)
764 return;
765 if (!cor_seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
766 return;
768 cr->seqno -= cr_prev->length;
769 cr->length += cr_prev->length;
771 cor_cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
772 &bytes_dummyacked);
775 static void cor_try_combine_conn_retrans_next(
776 struct cor_neighbor *nb_retranslocked,
777 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
779 struct cor_conn_retrans *cr_next;
780 __u64 bytes_dummyacked = 0;
782 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
784 if (cr->conn_list.next == &(trgt_out_lx->trgt.out.retrans_list))
785 return;
787 cr_next = container_of(cr->conn_list.next, struct cor_conn_retrans,
788 conn_list);
790 if (cr_next->state != CONN_RETRANS_SCHEDULED)
791 return;
792 if (cr_next->timeout != cr->timeout)
793 return;
794 if (!cor_seqno_eq(cr->seqno + cr->length, cr_next->seqno))
795 return;
797 cr->length += cr_next->length;
799 cor_cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
800 &bytes_dummyacked);
803 void cor_schedule_retransmit_conn(struct cor_conn_retrans *cr, int connlocked,
804 int nbretransconn_locked)
806 struct cor_conn *trgt_out_o = cr->trgt_out_o;
807 struct cor_neighbor *nb;
808 int first;
810 if (connlocked == 0)
811 spin_lock_bh(&(trgt_out_o->rcv_lock));
813 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
814 nb = trgt_out_o->trgt.out.nb;
816 if (trgt_out_o->is_highlatency)
817 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
818 atomic_read(&(nb->latency_stddev_retrans_us)),
819 atomic_read(&(nb->max_remote_ackconn_lowlat_delay_us)));
820 else
821 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
822 atomic_read(&(nb->latency_stddev_retrans_us)),
823 atomic_read(&(nb->max_remote_ackconn_highlat_delay_us)));
825 if (nbretransconn_locked == 0)
826 spin_lock_bh(&(nb->retrans_conn_lock));
828 cor_nb_kref_get(nb, "stack");
830 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
832 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
833 goto out;
834 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
835 BUG_ON(trgt_out_o->trgt.out.retrans_lowwindow == 0);
836 if (likely(trgt_out_o->trgt.out.retrans_lowwindow != 65535))
837 trgt_out_o->trgt.out.retrans_lowwindow--;
840 if (trgt_out_o->is_highlatency) {
841 first = unlikely(list_empty(
842 &(nb->retrans_conn_highlatency_list)));
843 list_add_tail(&(cr->timeout_list),
844 &(nb->retrans_conn_highlatency_list));
845 } else {
846 first = unlikely(list_empty(
847 &(nb->retrans_conn_lowlatency_list)));
848 list_add_tail(&(cr->timeout_list),
849 &(nb->retrans_conn_lowlatency_list));
851 kref_get(&(cr->ref));
852 cr->state = CONN_RETRANS_SCHEDULED;
854 if (unlikely(first)) {
855 cor_reschedule_conn_retrans_timer(nb);
856 } else {
857 cor_try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
858 cor_try_combine_conn_retrans_next(nb, trgt_out_o, cr);
861 out:
862 if (nbretransconn_locked == 0)
863 spin_unlock_bh(&(nb->retrans_conn_lock));
865 cor_nb_kref_put(nb, "stack");
867 if (connlocked == 0)
868 spin_unlock_bh(&(trgt_out_o->rcv_lock));
871 static int _cor_flush_out_skb(struct cor_conn *trgt_out_lx, __u32 len)
873 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
875 __u64 seqno;
876 struct cor_conn_retrans *cr;
877 struct sk_buff *skb;
878 char *dst;
879 __u8 flush;
880 int rc;
882 if (trgt_out_lx->flush != 0 &&
883 trgt_out_lx->data_buf.read_remaining == len)
884 flush = 1;
886 seqno = trgt_out_lx->trgt.out.seqno_nextsend;
887 skb = cor_create_packet_conndata(trgt_out_lx->trgt.out.nb, len,
888 GFP_ATOMIC, trgt_out_lx->trgt.out.conn_id, seqno,
889 trgt_out_lx->trgt.out.lastsend_windowused, flush);
890 if (unlikely(skb == 0))
891 return RC_FLUSH_CONN_OUT_OOM;
893 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
894 trgt_out_lx->trgt.out.lastsend_windowused, 0, 0);
895 if (unlikely(cr == 0)) {
896 kfree_skb(skb);
897 return RC_FLUSH_CONN_OUT_OOM;
900 dst = skb_put(skb, len);
902 cor_databuf_pull(trgt_out_lx, dst, len);
904 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
905 if (rc == NET_XMIT_DROP) {
906 cor_databuf_unpull(trgt_out_lx, len);
907 spin_lock_bh(&(nb->retrans_conn_lock));
908 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
909 spin_unlock_bh(&(nb->retrans_conn_lock));
910 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
911 return RC_FLUSH_CONN_OUT_CONG;
914 trgt_out_lx->trgt.out.seqno_nextsend += len;
915 cor_nbcongwin_data_sent(nb, len);
916 cor_schedule_retransmit_conn(cr, 1, 0);
917 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
918 cor_update_src_sock_sndspeed(trgt_out_lx, len);
920 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
922 return (rc == NET_XMIT_SUCCESS) ?
923 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
926 static int _cor_flush_out_conndata(struct cor_conn *trgt_out_lx, __u32 len)
928 __u64 seqno;
929 struct cor_control_msg_out *cm;
930 struct cor_conn_retrans *cr;
931 char *buf;
932 __u8 flush = 0;
934 if (trgt_out_lx->flush != 0 &&
935 trgt_out_lx->data_buf.read_remaining == len)
936 flush = 1;
938 buf = kmalloc(len, GFP_ATOMIC);
940 if (unlikely(buf == 0))
941 return RC_FLUSH_CONN_OUT_OOM;
943 cm = cor_alloc_control_msg(trgt_out_lx->trgt.out.nb, ACM_PRIORITY_LOW);
944 if (unlikely(cm == 0)) {
945 kfree(buf);
946 return RC_FLUSH_CONN_OUT_OOM;
949 seqno = trgt_out_lx->trgt.out.seqno_nextsend;
951 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
952 trgt_out_lx->trgt.out.lastsend_windowused, 0, 0);
953 if (unlikely(cr == 0)) {
954 kfree(buf);
955 cor_free_control_msg(cm);
956 return RC_FLUSH_CONN_OUT_OOM;
959 cor_databuf_pull(trgt_out_lx, buf, len);
960 trgt_out_lx->trgt.out.seqno_nextsend += len;
961 cor_nbcongwin_data_sent(trgt_out_lx->trgt.out.nb, len);
962 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
963 cor_update_src_sock_sndspeed(trgt_out_lx, len);
965 cor_send_conndata(cm, trgt_out_lx->trgt.out.conn_id, seqno, buf, buf,
966 len, trgt_out_lx->trgt.out.lastsend_windowused, flush,
967 trgt_out_lx->is_highlatency, cr);
969 kref_put(&(cr->ref), cor_free_connretrans); /* alloc */
971 return RC_FLUSH_CONN_OUT_OK;
974 int cor_srcin_buflimit_reached(struct cor_conn *src_in_lx)
976 __u64 window_left;
978 if (unlikely(cor_seqno_before(src_in_lx->src.in.window_seqnolimit,
979 src_in_lx->src.in.next_seqno)))
980 return 1;
982 window_left = cor_seqno_clean(src_in_lx->src.in.window_seqnolimit -
983 src_in_lx->src.in.next_seqno);
985 if (window_left < WINDOW_ENCODE_MIN)
986 return 1;
988 if (window_left/2 < src_in_lx->data_buf.read_remaining)
989 return 1;
991 return 0;
994 static __u32 cor_maxsend_left_to_len(__u32 maxsend_left)
996 __u32 i;
997 if (maxsend_left < 128)
998 return maxsend_left;
1000 for (i = 128; i < 4096;) {
1001 if (i*2 > maxsend_left)
1002 return i;
1003 i = i*2;
1006 return maxsend_left - maxsend_left%4096;
1009 static void cor_set_last_windowused(struct cor_conn *trgt_out_lx)
1011 __u64 total_window;
1012 __u64 bytes_ackpending;
1014 BUG_ON(cor_seqno_before(trgt_out_lx->trgt.out.seqno_windowlimit,
1015 trgt_out_lx->trgt.out.seqno_acked));
1016 BUG_ON(cor_seqno_before(trgt_out_lx->trgt.out.seqno_nextsend,
1017 trgt_out_lx->trgt.out.seqno_acked));
1019 total_window = cor_seqno_clean(
1020 trgt_out_lx->trgt.out.seqno_windowlimit -
1021 trgt_out_lx->trgt.out.seqno_acked);
1022 bytes_ackpending = cor_seqno_clean(
1023 trgt_out_lx->trgt.out.seqno_nextsend -
1024 trgt_out_lx->trgt.out.seqno_acked);
1026 BUG_ON(bytes_ackpending > total_window);
1027 BUG_ON(bytes_ackpending > (U64_MAX / 64));
1029 trgt_out_lx->trgt.out.lastsend_windowused = div64_u64(
1030 bytes_ackpending * 31 + total_window - 1, total_window);
1033 static void _cor_flush_out_ignore_lowbuf(struct cor_conn *trgt_out_lx)
1035 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
1036 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
1037 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
1040 static __u64 cor_get_windowlimit(struct cor_conn *trgt_out_lx)
1042 if (unlikely(cor_seqno_before(trgt_out_lx->trgt.out.seqno_windowlimit,
1043 trgt_out_lx->trgt.out.seqno_nextsend)))
1044 return 0;
1046 return cor_seqno_clean(trgt_out_lx->trgt.out.seqno_windowlimit -
1047 trgt_out_lx->trgt.out.seqno_nextsend);
1050 int _cor_flush_out(struct cor_conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
1051 int from_qos, int maxsend_forcedelay)
1053 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
1055 __u32 targetmss;
1057 int nbstate;
1059 __u32 maxsend_left = maxsend;
1061 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1063 if (unlikely(trgt_out_lx->trgt.out.established == 0))
1064 return RC_FLUSH_CONN_OUT_OK;
1066 if (unlikely(trgt_out_lx->isreset != 0))
1067 return RC_FLUSH_CONN_OUT_OK;
1069 BUG_ON(trgt_out_lx->trgt.out.conn_id == 0);
1071 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
1072 return RC_FLUSH_CONN_OUT_OK;
1074 if (from_qos == 0 && cor_qos_fastsend_allowed_conn(trgt_out_lx) == 0)
1075 return RC_FLUSH_CONN_OUT_CONG;
1077 cor_get_conn_idletime(trgt_out_lx);
1079 nbstate = cor_get_neigh_state(nb);
1081 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
1082 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
1084 /* printk(KERN_ERR "flush %p %llu %u\n", trgt_out_l,
1085 cor_get_windowlimit(trgt_out_l),
1086 trgt_out_l->data_buf.read_remaining); */
1088 targetmss = cor_mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
1090 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
1091 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1092 int rc;
1094 if (maxsend_left < targetmss)
1095 break;
1097 if (windowlimit < targetmss) {
1098 trgt_out_lx->trgt.out.lastsend_windowused = 31;
1099 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1100 break;
1103 if (cor_nbcongwin_send_allowed(nb) == 0)
1104 return RC_FLUSH_CONN_OUT_CONG;
1106 if (likely(cor_send_conndata_as_skb(nb, targetmss)))
1107 rc = _cor_flush_out_skb(trgt_out_lx, targetmss);
1108 else
1109 rc = _cor_flush_out_conndata(trgt_out_lx, targetmss);
1111 if (rc == RC_FLUSH_CONN_OUT_OK ||
1112 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1113 maxsend_left -= targetmss;
1114 *sent += targetmss;
1115 cor_set_last_windowused(trgt_out_lx);
1118 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1119 return RC_FLUSH_CONN_OUT_CONG;
1120 if (rc != RC_FLUSH_CONN_OUT_OK)
1121 return rc;
1124 if (trgt_out_lx->data_buf.read_remaining > 0) {
1125 __u32 len = trgt_out_lx->data_buf.read_remaining;
1126 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1127 int rc;
1129 if (maxsend_left < len) {
1130 if (maxsend_left >= 65536 || (
1131 maxsend_left == maxsend &&
1132 maxsend_left >= 128 &&
1133 trgt_out_lx->is_highlatency == 0 &&
1134 !maxsend_forcedelay)) {
1135 len = cor_maxsend_left_to_len(maxsend_left);
1136 } else {
1137 return RC_FLUSH_CONN_OUT_MAXSENT;
1141 if (trgt_out_lx->flush == 0 &&
1142 trgt_out_lx->sourcetype == SOURCE_SOCK &&
1143 cor_sock_sndbufavailable(trgt_out_lx, 1) != 0)
1144 goto out;
1146 if (trgt_out_lx->flush == 0 &&
1147 trgt_out_lx->sourcetype == SOURCE_IN &&
1148 cor_srcin_buflimit_reached(trgt_out_lx)
1149 == 0 && (
1150 cor_seqno_eq(
1151 trgt_out_lx->trgt.out.seqno_nextsend,
1152 trgt_out_lx->trgt.out.seqno_acked) == 0 ||
1153 trgt_out_lx->is_highlatency != 0 ||
1154 LOWLATENCY_SEND_UNFLUSHED_DATA != 0))
1155 goto out;
1157 if (trgt_out_lx->flush == 0 &&
1158 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
1159 cor_conn_src_unconn_write_allowed(
1160 trgt_out_lx) != 0)
1161 goto out;
1163 if (windowlimit == 0 || (windowlimit < len &&
1164 cor_seqno_eq(
1165 trgt_out_lx->trgt.out.seqno_nextsend,
1166 trgt_out_lx->trgt.out.seqno_acked) == 0)) {
1167 trgt_out_lx->trgt.out.lastsend_windowused = 31;
1168 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1169 goto out;
1172 if (cor_nbcongwin_send_allowed(nb) == 0)
1173 return RC_FLUSH_CONN_OUT_CONG;
1175 if (len > windowlimit) {
1176 len = windowlimit;
1177 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1180 if (cor_send_conndata_as_skb(nb, len))
1181 rc = _cor_flush_out_skb(trgt_out_lx, len);
1182 else
1183 rc = _cor_flush_out_conndata(trgt_out_lx, len);
1186 if (rc == RC_FLUSH_CONN_OUT_OK ||
1187 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1188 maxsend_left -= len;
1189 *sent += len;
1190 cor_set_last_windowused(trgt_out_lx);
1193 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1194 return RC_FLUSH_CONN_OUT_CONG;
1195 if (rc != RC_FLUSH_CONN_OUT_OK)
1196 return rc;
1199 out:
1200 return RC_FLUSH_CONN_OUT_OK;
1203 int __init cor_snd_init(void)
1205 cor_connretrans_slab = kmem_cache_create("cor_connretrans",
1206 sizeof(struct cor_conn_retrans), 8, 0, 0);
1207 if (unlikely(cor_connretrans_slab == 0))
1208 return -ENOMEM;
1210 return 0;
1213 void __exit cor_snd_exit2(void)
1215 kmem_cache_destroy(cor_connretrans_slab);
1216 cor_connretrans_slab = 0;
1219 MODULE_LICENSE("GPL");