rename files
[cor.git] / net / cor / conn_trgt_out.c
blobb2380bf97e619209b581fbd9c0b27ac28a9d5afe
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct kmem_cache *cor_connretrans_slab;
30 static void cor_free_connretrans(struct kref *ref)
32 struct cor_conn_retrans *cr = container_of(ref, struct cor_conn_retrans,
33 ref);
34 struct cor_conn *cn = cr->trgt_out_o;
36 BUG_ON(cr->state != CONN_RETRANS_ACKED);
38 kmem_cache_free(cor_connretrans_slab, cr);
39 kref_put(&(cn->ref), cor_free_conn);
42 void cor_reschedule_conn_retrans_timer(
43 struct cor_neighbor *nb_retransconnlocked)
45 struct cor_conn_retrans *cr = 0;
47 if (list_empty(&(nb_retransconnlocked->retrans_conn_list)))
48 return;
50 cr = container_of(nb_retransconnlocked->retrans_conn_list.next,
51 struct cor_conn_retrans, timeout_list);
53 if (time_before_eq(cr->timeout, jiffies)) {
54 cor_qos_enqueue(nb_retransconnlocked->queue,
55 &(nb_retransconnlocked->rb_cr), ns_to_ktime(0),
56 QOS_CALLER_CONN_RETRANS);
57 } else {
58 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
59 cr->timeout) == 0) {
60 kref_get(&(nb_retransconnlocked->ref));
65 /**
66 * warning:
67 * caller must also call kref_get/put, see cor_reschedule_conn_retrans_timer
69 static void cor_cancel_conn_retrans(struct cor_neighbor *nb_retransconnlocked,
70 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr,
71 __u64 *bytes_acked)
73 if (unlikely(cr->state == CONN_RETRANS_ACKED))
74 return;
76 if (cr->state == CONN_RETRANS_SCHEDULED) {
77 list_del(&(cr->timeout_list));
78 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
79 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
80 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
81 trgt_out_lx->target.out.retrans_lowwindow--;
84 if (cr->state != CONN_RETRANS_INITIAL)
85 *bytes_acked += cr->length;
87 list_del(&(cr->conn_list));
88 cr->state = CONN_RETRANS_ACKED;
90 kref_put(&(cr->ref), cor_free_connretrans);
93 /**
94 * nb->retrans_conn_lock must be held when calling this
95 * (see cor_schedule_retransmit_conn())
97 static void cor_cancel_acked_conn_retrans(struct cor_conn *trgt_out_l,
98 __u64 *bytes_acked)
100 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
102 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
103 struct cor_conn_retrans *cr = container_of(
104 trgt_out_l->target.out.retrans_list.next,
105 struct cor_conn_retrans, conn_list);
107 if (cor_seqno_after(cr->seqno + cr->length, seqno_acked)) {
108 if (cor_seqno_before(cr->seqno, seqno_acked)) {
109 *bytes_acked += cor_seqno_clean(seqno_acked -
110 cr->seqno);
111 cr->length -= cor_seqno_clean(seqno_acked -
112 cr->seqno);
113 cr->seqno = seqno_acked;
115 break;
118 cor_cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l,
119 cr, bytes_acked);
122 cor_reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
125 void cor_cancel_all_conn_retrans(struct cor_conn *trgt_out_lx)
127 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
128 __u64 bytes_acked = 0;
130 spin_lock_bh(&(nb->retrans_conn_lock));
132 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
133 struct cor_conn_retrans *cr = container_of(
134 trgt_out_lx->target.out.retrans_list.next,
135 struct cor_conn_retrans, conn_list);
136 BUG_ON(cr->trgt_out_o != trgt_out_lx);
138 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
141 cor_reschedule_conn_retrans_timer(nb);
143 spin_unlock_bh(&(nb->retrans_conn_lock));
145 if (bytes_acked > 0)
146 cor_nbcongwin_data_acked(nb, bytes_acked);
149 static void cor_cancel_all_conn_retrans_nb(struct cor_neighbor *nb)
151 __u64 bytes_acked = 0;
153 while (1) {
154 struct cor_conn_retrans *cr;
156 spin_lock_bh(&(nb->retrans_conn_lock));
158 if (list_empty(&(nb->retrans_conn_list))) {
159 spin_unlock_bh(&(nb->retrans_conn_lock));
160 break;
163 cr = container_of(nb->retrans_conn_list.next,
164 struct cor_conn_retrans, timeout_list);
166 kref_get(&(cr->ref));
168 spin_unlock_bh(&(nb->retrans_conn_lock));
171 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
172 spin_lock_bh(&(nb->retrans_conn_lock));
174 if (likely(cr == container_of(nb->retrans_conn_list.next,
175 struct cor_conn_retrans, timeout_list)))
176 cor_cancel_conn_retrans(nb, cr->trgt_out_o, cr,
177 &bytes_acked);
179 spin_unlock_bh(&(nb->retrans_conn_lock));
180 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
182 kref_put(&(cr->ref), cor_free_connretrans);
185 if (bytes_acked > 0)
186 cor_nbcongwin_data_acked(nb, bytes_acked);
189 static struct cor_conn_retrans *cor_prepare_conn_retrans(
190 struct cor_conn *trgt_out_l, __u64 seqno, __u32 len,
191 __u8 snd_delayed_lowbuf, struct cor_conn_retrans *cr_splitted,
192 int retransconnlocked)
194 struct cor_neighbor *nb = trgt_out_l->target.out.nb;
196 struct cor_conn_retrans *cr = kmem_cache_alloc(cor_connretrans_slab,
197 GFP_ATOMIC);
199 if (unlikely(cr == 0))
200 return 0;
202 BUG_ON(trgt_out_l->isreset != 0);
204 memset(cr, 0, sizeof (struct cor_conn_retrans));
205 cr->trgt_out_o = trgt_out_l;
206 kref_get(&(trgt_out_l->ref));
207 cr->seqno = seqno;
208 cr->length = len;
209 cr->snd_delayed_lowbuf = snd_delayed_lowbuf;
210 kref_init(&(cr->ref));
212 kref_get(&(cr->ref));
213 if (retransconnlocked == 0)
214 spin_lock_bh(&(nb->retrans_conn_lock));
216 if (cr_splitted != 0)
217 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
218 else
219 list_add_tail(&(cr->conn_list),
220 &(cr->trgt_out_o->target.out.retrans_list));
222 if (retransconnlocked == 0)
223 spin_unlock_bh(&(nb->retrans_conn_lock));
225 return cr;
228 #define RC_SENDRETRANS_OK 0
229 #define RC_SENDRETRANS_OOM 1
230 #define RC_SENDRETRANS_QUEUEFULL 2
231 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
233 static int __cor_send_retrans(struct cor_neighbor *nb,
234 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr,
235 __u64 *bytes_sent)
237 __u8 flush = 0;
239 BUG_ON(cr->length == 0);
241 if (trgt_out_l->flush != 0 && cor_seqno_eq(cr->seqno + cr->length,
242 trgt_out_l->target.out.seqno_nextsend) &&
243 trgt_out_l->data_buf.read_remaining == 0)
244 flush = 1;
246 if (cor_send_conndata_as_skb(nb, cr->length)) {
247 struct sk_buff *skb;
248 char *dst;
249 int rc;
251 skb = cor_create_packet_conndata(nb, cr->length, GFP_ATOMIC,
252 trgt_out_l->target.out.conn_id, cr->seqno,
253 cr->snd_delayed_lowbuf, flush);
254 if (unlikely(skb == 0))
255 return RC_SENDRETRANS_OOM;
257 dst = skb_put(skb, cr->length);
259 cor_databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
261 rc = cor_dev_queue_xmit(skb, nb->queue,
262 QOS_CALLER_CONN_RETRANS);
263 if (rc == NET_XMIT_DROP)
264 return RC_SENDRETRANS_QUEUEFULLDROPPED;
265 cor_schedule_retransmit_conn(cr, 1, 0);
266 if (rc != NET_XMIT_SUCCESS)
267 return RC_SENDRETRANS_QUEUEFULL;
269 } else {
270 struct cor_control_msg_out *cm;
271 char *buf;
273 buf = kmalloc(cr->length, GFP_ATOMIC);
274 if (unlikely(buf == 0))
275 return RC_SENDRETRANS_OOM;
277 cm = cor_alloc_control_msg(nb, ACM_PRIORITY_LOW);
278 if (unlikely(cm == 0)) {
279 kfree(buf);
280 return RC_SENDRETRANS_OOM;
283 cor_databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
285 cor_send_conndata(cm, trgt_out_l->target.out.conn_id,
286 cr->seqno, buf, buf, cr->length,
287 cr->snd_delayed_lowbuf, flush,
288 trgt_out_l->is_highlatency, cr);
291 *bytes_sent += cr->length;
293 return RC_SENDRETRANS_OK;
296 static int _cor_send_retrans_splitcr_ifneeded(
297 struct cor_neighbor *nb_retransconnlocked,
298 struct cor_conn *trgt_out_l, struct cor_conn_retrans *cr)
300 __u32 targetmss = cor_mss_conndata(nb_retransconnlocked,
301 trgt_out_l->is_highlatency != 0);
302 __u64 windowlimit = cor_seqno_clean(
303 trgt_out_l->target.out.seqno_windowlimit -
304 cr->seqno);
305 __u32 maxsize = targetmss;
306 if (windowlimit < maxsize)
307 maxsize = windowlimit;
309 if (unlikely(cr->length > maxsize)) {
310 struct cor_conn_retrans *cr2 = cor_prepare_conn_retrans(
311 trgt_out_l, cr->seqno + maxsize,
312 cr->length - maxsize, cr->snd_delayed_lowbuf,
313 cr, 1);
314 if (unlikely(cr2 == 0))
315 return RC_SENDRETRANS_OOM;
317 cr2->timeout = cr->timeout;
319 list_add(&(cr2->timeout_list),
320 &(nb_retransconnlocked->retrans_conn_list));
321 cr2->state = CONN_RETRANS_SCHEDULED;
323 cr->length = maxsize;
326 return RC_SENDRETRANS_OK;
329 static int _cor_send_retrans(struct cor_neighbor *nb,
330 struct cor_conn_retrans *cr, __u64 *bytes_sent)
333 struct cor_conn *trgt_out_o = cr->trgt_out_o;
334 int rc = RC_SENDRETRANS_OK;
336 spin_lock_bh(&(trgt_out_o->rcv_lock));
338 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
339 BUG_ON(trgt_out_o->target.out.nb != nb);
341 spin_lock_bh(&(nb->retrans_conn_lock));
342 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
343 spin_unlock_bh(&(nb->retrans_conn_lock));
344 goto out;
347 BUG_ON(trgt_out_o->isreset != 0);
349 BUG_ON(cor_seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
351 if (cor_seqno_after_eq(cr->seqno,
352 trgt_out_o->target.out.seqno_windowlimit)) {
353 BUG_ON(cr->state != CONN_RETRANS_SENDING);
354 cr->state = CONN_RETRANS_LOWWINDOW;
355 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
356 trgt_out_o->target.out.retrans_lowwindow++;
358 spin_unlock_bh(&(nb->retrans_conn_lock));
359 goto out;
362 rc = _cor_send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
364 spin_unlock_bh(&(nb->retrans_conn_lock));
366 kref_get(&(trgt_out_o->ref));
368 if (rc == RC_SENDRETRANS_OK)
369 rc = __cor_send_retrans(nb, trgt_out_o, cr, bytes_sent);
371 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
372 spin_lock_bh(&(nb->retrans_conn_lock));
373 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
374 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
375 if (rc == RC_SENDRETRANS_OOM)
376 cr->timeout = jiffies + 1;
377 list_add(&(cr->timeout_list), &(nb->retrans_conn_list));
378 cr->state = CONN_RETRANS_SCHEDULED;
379 } else {
380 BUG();
382 spin_unlock_bh(&(nb->retrans_conn_lock));
385 out:
386 spin_unlock_bh(&(trgt_out_o->rcv_lock));
388 kref_put(&(trgt_out_o->ref), cor_free_conn);
390 return (rc == RC_SENDRETRANS_OOM ||
391 rc == RC_SENDRETRANS_QUEUEFULL ||
392 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
395 int cor_send_retrans(struct cor_neighbor *nb, int *sent)
397 int queuefull = 0;
398 int nbstate = cor_get_neigh_state(nb);
399 __u64 bytes_sent = 0;
401 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
402 return QOS_RESUME_DONE;
403 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
405 * cor_cancel_all_conn_retrans_nb should not be needed, because
406 * cor_reset_all_conns calls cor_cancel_all_conn_retrans
408 cor_cancel_all_conn_retrans_nb(nb);
409 return QOS_RESUME_DONE;
412 while (1) {
413 struct cor_conn_retrans *cr = 0;
415 spin_lock_bh(&(nb->retrans_conn_lock));
417 if (list_empty(&(nb->retrans_conn_list))) {
418 spin_unlock_bh(&(nb->retrans_conn_lock));
419 break;
422 cr = container_of(nb->retrans_conn_list.next,
423 struct cor_conn_retrans, timeout_list);
425 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
427 if (time_after(cr->timeout, jiffies)) {
428 cor_reschedule_conn_retrans_timer(nb);
429 spin_unlock_bh(&(nb->retrans_conn_lock));
430 break;
433 kref_get(&(cr->ref));
434 list_del(&(cr->timeout_list));
435 cr->state = CONN_RETRANS_SENDING;
437 spin_unlock_bh(&(nb->retrans_conn_lock));
439 queuefull = _cor_send_retrans(nb, cr, &bytes_sent);
440 kref_put(&(cr->ref), cor_free_connretrans);
441 if (queuefull) {
442 break;
443 } else {
444 *sent = 1;
448 if (bytes_sent > 0)
449 cor_nbcongwin_data_retransmitted(nb, bytes_sent);
451 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
454 void cor_retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
456 struct cor_neighbor *nb = container_of(retrans_conn_timer,
457 struct cor_neighbor, retrans_conn_timer);
458 cor_qos_enqueue(nb->queue, &(nb->rb_cr), ns_to_ktime(0),
459 QOS_CALLER_CONN_RETRANS);
460 kref_put(&(nb->ref), cor_neighbor_free);
463 static void cor_conn_ack_ooo_rcvd_splitcr(struct cor_conn *trgt_out_l,
464 struct cor_conn_retrans *cr, __u64 seqno_ooo, __u32 length,
465 __u64 *bytes_acked)
467 struct cor_conn_retrans *cr2;
468 __u64 seqno_cr2start;
469 __u32 oldcrlenght = cr->length;
471 if (cr->state != CONN_RETRANS_SCHEDULED &&
472 cr->state != CONN_RETRANS_LOWWINDOW)
473 return;
475 seqno_cr2start = seqno_ooo+length;
476 cr2 = cor_prepare_conn_retrans(trgt_out_l, seqno_cr2start,
477 cor_seqno_clean(cr->seqno + cr->length -
478 seqno_cr2start),
479 cr->snd_delayed_lowbuf, cr, 1);
481 if (unlikely(cr2 == 0))
482 return;
484 BUG_ON(cr2->length > cr->length);
486 cr2->timeout = cr->timeout;
487 cr2->state = cr->state;
489 if (cr->state != CONN_RETRANS_SCHEDULED)
490 list_add(&(cr2->timeout_list), &(cr->timeout_list));
492 BUG_ON(cor_seqno_clean(seqno_ooo - cr->seqno) > cr->length);
494 cr->length -= cor_seqno_clean(seqno_ooo - cr->seqno);
495 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
497 *bytes_acked += length;
500 void cor_conn_ack_ooo_rcvd(struct cor_neighbor *nb, __u32 conn_id,
501 struct cor_conn *trgt_out, __u64 seqno_ooo, __u32 length,
502 __u64 *bytes_acked)
504 struct list_head *curr;
506 if (unlikely(length == 0))
507 return;
509 spin_lock_bh(&(trgt_out->rcv_lock));
511 if (unlikely(trgt_out->targettype != TARGET_OUT))
512 goto out;
513 if (unlikely(trgt_out->target.out.nb != nb))
514 goto out;
515 if (unlikely(trgt_out->target.out.conn_id != conn_id))
516 goto out;
518 kref_get(&(nb->ref));
519 spin_lock_bh(&(nb->retrans_conn_lock));
521 curr = trgt_out->target.out.retrans_list.next;
522 while (curr != &(trgt_out->target.out.retrans_list)) {
523 struct cor_conn_retrans *cr = container_of(curr,
524 struct cor_conn_retrans, conn_list);
526 int ack_covers_start = cor_seqno_after_eq(cr->seqno, seqno_ooo);
527 int ack_covers_end = cor_seqno_before_eq(cr->seqno + cr->length,
528 seqno_ooo + length);
530 curr = curr->next;
532 if (cor_seqno_before(cr->seqno + cr->length, seqno_ooo))
533 continue;
535 if (cor_seqno_after(cr->seqno, seqno_ooo + length))
536 break;
538 if (likely(ack_covers_start && ack_covers_end)) {
539 cor_cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
540 cor_reschedule_conn_retrans_timer(nb);
541 } else if (ack_covers_start) {
542 __u32 diff = seqno_ooo + length - cr->seqno -
543 cr->length;
544 BUG_ON(diff >= cr->length);
545 cr->seqno += diff;
546 cr->length -= diff;
547 *bytes_acked =+ diff;
548 } else if (ack_covers_end) {
549 __u32 diff = seqno_ooo + length - cr->seqno;
550 BUG_ON(diff >= length);
551 cr->length -= diff;
552 *bytes_acked += diff;
553 } else {
554 cor_conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
555 length, bytes_acked);
556 break;
560 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
561 trgt_out->target.out.seqno_acked =
562 trgt_out->target.out.seqno_nextsend;
563 } else {
564 struct cor_conn_retrans *cr = container_of(
565 trgt_out->target.out.retrans_list.next,
566 struct cor_conn_retrans, conn_list);
567 if (cor_seqno_after(cr->seqno,
568 trgt_out->target.out.seqno_acked))
569 trgt_out->target.out.seqno_acked = cr->seqno;
572 spin_unlock_bh(&(nb->retrans_conn_lock));
573 kref_put(&(nb->ref), cor_neighbor_free);
575 out:
576 spin_unlock_bh(&(trgt_out->rcv_lock));
579 static void _cor_conn_ack_rcvd_nosendwin(struct cor_conn *trgt_out_l)
581 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
582 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
583 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
585 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
586 trgt_out_l->bufsize.act.noact.bytesleft = max(
587 trgt_out_l->bufsize.act.noact.bytesleft,
588 (__u32) BUF_OUT_WIN_NOK_NOINCR);
590 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
591 trgt_out_l->bufsize.ignore_rcv_lowbuf,
592 (__u32) BUF_OUT_WIN_NOK_NOINCR);
596 * nb->retrans_conn_lock must be held when calling this
597 * (see cor_schedule_retransmit_conn())
599 static void cor_reschedule_lowwindow_retrans(struct cor_conn *trgt_out_l)
601 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
602 int cnt = 0;
604 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
605 struct cor_conn_retrans *cr;
607 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
608 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
609 65535);
610 trgt_out_l->target.out.retrans_lowwindow = 0;
611 break;
614 cr = container_of(lh, struct cor_conn_retrans, conn_list);
616 if (cor_seqno_after_eq(cr->seqno,
617 trgt_out_l->target.out.seqno_windowlimit)) {
618 break;
621 if (cr->state == CONN_RETRANS_LOWWINDOW)
622 cor_schedule_retransmit_conn(cr, 1, 1);
624 lh = lh->next;
625 cnt++;
629 void cor_conn_ack_rcvd(struct cor_neighbor *nb, __u32 conn_id,
630 struct cor_conn *trgt_out, __u64 seqno, int setwindow,
631 __u8 window, __u64 *bytes_acked)
633 int seqno_advanced = 0;
634 int window_enlarged = 0;
636 spin_lock_bh(&(trgt_out->rcv_lock));
638 if (unlikely(trgt_out->isreset != 0))
639 goto out;
640 if (unlikely(trgt_out->targettype != TARGET_OUT))
641 goto out;
642 if (unlikely(trgt_out->target.out.nb != nb))
643 goto out;
644 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
645 goto out;
647 if (unlikely(cor_seqno_after(seqno,
648 trgt_out->target.out.seqno_nextsend) ||
649 cor_seqno_before(seqno,
650 trgt_out->target.out.seqno_acked)))
651 goto out;
653 if (setwindow) {
654 __u64 windowdec = cor_dec_log_64_7(window);
655 if (likely(cor_seqno_after(seqno,
656 trgt_out->target.out.seqno_acked)) ||
657 cor_seqno_after(seqno + windowdec,
658 trgt_out->target.out.seqno_windowlimit)) {
659 trgt_out->target.out.seqno_windowlimit = seqno +
660 windowdec;
661 window_enlarged = 1;
665 if (cor_seqno_after(seqno, trgt_out->target.out.seqno_acked))
666 seqno_advanced = 1;
668 if (seqno_advanced == 0 && window_enlarged == 0)
669 goto out;
671 kref_get(&(nb->ref));
672 spin_lock_bh(&(nb->retrans_conn_lock));
674 if (seqno_advanced) {
675 trgt_out->target.out.seqno_acked = seqno;
676 cor_cancel_acked_conn_retrans(trgt_out, bytes_acked);
679 if (window_enlarged)
680 cor_reschedule_lowwindow_retrans(trgt_out);
682 spin_unlock_bh(&(nb->retrans_conn_lock));
683 kref_put(&(nb->ref), cor_neighbor_free);
685 if (seqno_advanced)
686 cor_databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
688 if (cor_seqno_eq(trgt_out->target.out.seqno_acked,
689 trgt_out->target.out.seqno_nextsend))
690 _cor_conn_ack_rcvd_nosendwin(trgt_out);
692 out:
693 if (seqno_advanced || window_enlarged)
694 cor_flush_buf(trgt_out);
696 spin_unlock_bh(&(trgt_out->rcv_lock));
698 cor_wake_sender(trgt_out);
701 static void cor_try_combine_conn_retrans_prev(
702 struct cor_neighbor *nb_retransconnlocked,
703 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
705 struct cor_conn_retrans *cr_prev;
706 __u64 bytes_dummyacked = 0;
708 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
710 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
711 return;
713 cr_prev = container_of(cr->conn_list.prev, struct cor_conn_retrans,
714 conn_list);
716 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
717 return;
718 if (cr_prev->timeout != cr->timeout)
719 return;
720 if (!cor_seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
721 return;
723 cr->seqno -= cr_prev->length;
724 cr->length += cr_prev->length;
726 cor_cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
727 &bytes_dummyacked);
730 static void cor_try_combine_conn_retrans_next(
731 struct cor_neighbor *nb_retranslocked,
732 struct cor_conn *trgt_out_lx, struct cor_conn_retrans *cr)
734 struct cor_conn_retrans *cr_next;
735 __u64 bytes_dummyacked = 0;
737 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
739 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
740 return;
742 cr_next = container_of(cr->conn_list.next, struct cor_conn_retrans,
743 conn_list);
745 if (cr_next->state != CONN_RETRANS_SCHEDULED)
746 return;
747 if (cr_next->timeout != cr->timeout)
748 return;
749 if (!cor_seqno_eq(cr->seqno + cr->length, cr_next->seqno))
750 return;
752 cr->length += cr_next->length;
754 cor_cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
755 &bytes_dummyacked);
758 void cor_schedule_retransmit_conn(struct cor_conn_retrans *cr, int connlocked,
759 int nbretransconn_locked)
761 struct cor_conn *trgt_out_o = cr->trgt_out_o;
762 struct cor_neighbor *nb;
763 int first;
765 if (connlocked == 0)
766 spin_lock_bh(&(trgt_out_o->rcv_lock));
768 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
769 nb = trgt_out_o->target.out.nb;
771 cr->timeout = cor_calc_timeout(atomic_read(&(nb->latency_retrans_us)),
772 atomic_read(&(nb->latency_stddev_retrans_us)),
773 atomic_read(&(nb->max_remote_ackconn_delay_us)));
775 if (nbretransconn_locked == 0)
776 spin_lock_bh(&(nb->retrans_conn_lock));
778 kref_get(&(nb->ref));
780 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
782 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
783 goto out;
784 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
785 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
786 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
787 trgt_out_o->target.out.retrans_lowwindow--;
790 first = unlikely(list_empty(&(nb->retrans_conn_list)));
791 list_add_tail(&(cr->timeout_list), &(nb->retrans_conn_list));
792 cr->state = CONN_RETRANS_SCHEDULED;
794 if (unlikely(first)) {
795 cor_reschedule_conn_retrans_timer(nb);
796 } else {
797 cor_try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
798 cor_try_combine_conn_retrans_next(nb, trgt_out_o, cr);
801 out:
802 if (nbretransconn_locked == 0)
803 spin_unlock_bh(&(nb->retrans_conn_lock));
805 kref_put(&(nb->ref), cor_neighbor_free);
807 if (connlocked == 0)
808 spin_unlock_bh(&(trgt_out_o->rcv_lock));
811 static int _cor_flush_out_skb(struct cor_conn *trgt_out_lx, __u32 len,
812 __u8 snd_delayed_lowbuf)
814 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
816 __u64 seqno;
817 struct cor_conn_retrans *cr;
818 struct sk_buff *skb;
819 char *dst;
820 __u8 flush;
821 int rc;
823 if (trgt_out_lx->flush != 0 &&
824 trgt_out_lx->data_buf.read_remaining == len)
825 flush = 1;
827 seqno = trgt_out_lx->target.out.seqno_nextsend;
828 skb = cor_create_packet_conndata(trgt_out_lx->target.out.nb, len,
829 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
830 snd_delayed_lowbuf, flush);
831 if (unlikely(skb == 0))
832 return RC_FLUSH_CONN_OUT_OOM;
834 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
835 snd_delayed_lowbuf, 0, 0);
836 if (unlikely(cr == 0)) {
837 kfree_skb(skb);
838 return RC_FLUSH_CONN_OUT_OOM;
841 dst = skb_put(skb, len);
843 cor_databuf_pull(trgt_out_lx, dst, len);
845 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
846 if (rc == NET_XMIT_DROP) {
847 cor_databuf_unpull(trgt_out_lx, len);
848 spin_lock_bh(&(nb->retrans_conn_lock));
849 cor_cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
850 spin_unlock_bh(&(nb->retrans_conn_lock));
851 kref_put(&(cr->ref), cor_free_connretrans);
852 return RC_FLUSH_CONN_OUT_CONG;
855 trgt_out_lx->target.out.seqno_nextsend += len;
856 cor_nbcongwin_data_sent(nb, len);
857 cor_schedule_retransmit_conn(cr, 1, 0);
858 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
859 cor_update_src_sock_sndspeed(trgt_out_lx, len);
861 kref_put(&(cr->ref), cor_free_connretrans);
863 return (rc == NET_XMIT_SUCCESS) ?
864 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
867 static int _cor_flush_out_conndata(struct cor_conn *trgt_out_lx, __u32 len,
868 __u8 snd_delayed_lowbuf)
870 __u64 seqno;
871 struct cor_control_msg_out *cm;
872 struct cor_conn_retrans *cr;
873 char *buf;
874 __u8 flush = 0;
876 if (trgt_out_lx->flush != 0 &&
877 trgt_out_lx->data_buf.read_remaining == len)
878 flush = 1;
880 buf = kmalloc(len, GFP_ATOMIC);
882 if (unlikely(buf == 0))
883 return RC_FLUSH_CONN_OUT_OOM;
885 cm = cor_alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
886 if (unlikely(cm == 0)) {
887 kfree(buf);
888 return RC_FLUSH_CONN_OUT_OOM;
891 seqno = trgt_out_lx->target.out.seqno_nextsend;
893 cr = cor_prepare_conn_retrans(trgt_out_lx, seqno, len,
894 snd_delayed_lowbuf, 0, 0);
895 if (unlikely(cr == 0)) {
896 kfree(buf);
897 cor_free_control_msg(cm);
898 return RC_FLUSH_CONN_OUT_OOM;
901 cor_databuf_pull(trgt_out_lx, buf, len);
902 trgt_out_lx->target.out.seqno_nextsend += len;
903 cor_nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
904 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
905 cor_update_src_sock_sndspeed(trgt_out_lx, len);
907 cor_send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf,
908 len, snd_delayed_lowbuf, flush,
909 trgt_out_lx->is_highlatency, cr);
911 return RC_FLUSH_CONN_OUT_OK;
914 int cor_srcin_buflimit_reached(struct cor_conn *src_in_lx)
916 __u64 window_left;
918 if (unlikely(cor_seqno_before(src_in_lx->source.in.window_seqnolimit,
919 src_in_lx->source.in.next_seqno)))
920 return 1;
922 window_left = cor_seqno_clean(src_in_lx->source.in.window_seqnolimit -
923 src_in_lx->source.in.next_seqno);
925 if (window_left < WINDOW_ENCODE_MIN)
926 return 1;
928 if (window_left/2 < src_in_lx->data_buf.read_remaining)
929 return 1;
931 return 0;
934 static __u32 cor_maxsend_left_to_len(__u32 maxsend_left)
936 __u32 i;
937 if (maxsend_left < 128)
938 return maxsend_left;
940 for (i=128;i<4096;) {
941 if (i*2 > maxsend_left)
942 return i;
943 i = i*2;
946 return maxsend_left - maxsend_left%4096;
949 static int cor_seqno_low_sendlimit(struct cor_conn *trgt_out_lx,
950 __u64 windowlimit, __u32 sndlen)
952 __u64 bytes_ackpending;
954 BUG_ON(cor_seqno_before(trgt_out_lx->target.out.seqno_nextsend,
955 trgt_out_lx->target.out.seqno_acked));
957 bytes_ackpending = cor_seqno_clean(
958 trgt_out_lx->target.out.seqno_nextsend -
959 trgt_out_lx->target.out.seqno_acked);
961 if (windowlimit <= sndlen)
962 return 1;
964 if (unlikely(bytes_ackpending + sndlen < bytes_ackpending))
965 return 0;
967 if (trgt_out_lx->is_highlatency != 0)
968 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 4)
969 ? 1 : 0;
970 else
971 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 8)
972 ? 1 : 0;
975 static void _cor_flush_out_ignore_lowbuf(struct cor_conn *trgt_out_lx)
977 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
978 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
979 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
982 static __u64 cor_get_windowlimit(struct cor_conn *trgt_out_lx)
984 if (unlikely(cor_seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
985 trgt_out_lx->target.out.seqno_nextsend)))
986 return 0;
988 return cor_seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
989 trgt_out_lx->target.out.seqno_nextsend);
992 int _cor_flush_out(struct cor_conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
993 int from_qos, int maxsend_forcedelay)
995 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
997 __u32 targetmss;
999 int nbstate;
1001 __u8 snd_delayed_lowbuf = trgt_out_lx->target.out.windowlimit_reached;
1003 __u32 maxsend_left = maxsend;
1005 trgt_out_lx->target.out.windowlimit_reached = 0;
1007 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1009 if (unlikely(trgt_out_lx->target.out.established == 0))
1010 return RC_FLUSH_CONN_OUT_OK;
1012 if (unlikely(trgt_out_lx->isreset != 0))
1013 return RC_FLUSH_CONN_OUT_OK;
1015 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
1017 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
1018 return RC_FLUSH_CONN_OUT_OK;
1020 if (from_qos == 0 && cor_qos_fastsend_allowed_conn(trgt_out_lx) == 0)
1021 return RC_FLUSH_CONN_OUT_CONG;
1023 cor_get_conn_idletime(trgt_out_lx);
1025 spin_lock_bh(&(nb->stalledconn_lock));
1026 nbstate = cor_get_neigh_state(nb);
1027 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
1028 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev == 0 &&
1029 trgt_out_lx->target.out.nbstalled_lh.next != 0);
1030 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev != 0 &&
1031 trgt_out_lx->target.out.nbstalled_lh.next == 0);
1033 if (trgt_out_lx->target.out.nbstalled_lh.prev == 0) {
1034 kref_get(&(trgt_out_lx->ref));
1035 list_add_tail(&(trgt_out_lx->target.out.nbstalled_lh),
1036 &(nb->stalledconn_list));
1039 spin_unlock_bh(&(nb->stalledconn_lock));
1041 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
1042 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
1044 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
1045 cor_get_windowlimit(trgt_out_l),
1046 trgt_out_l->data_buf.read_remaining); */
1048 targetmss = cor_mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
1050 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
1051 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1052 int rc;
1054 if (maxsend_left < targetmss)
1055 break;
1057 if (windowlimit < targetmss) {
1058 trgt_out_lx->target.out.windowlimit_reached = 1;
1059 snd_delayed_lowbuf = 1;
1060 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1061 break;
1064 if (cor_nbcongwin_send_allowed(nb) == 0)
1065 return RC_FLUSH_CONN_OUT_CONG;
1067 if (cor_seqno_low_sendlimit(trgt_out_lx, windowlimit,
1068 targetmss)) {
1069 trgt_out_lx->target.out.windowlimit_reached = 1;
1070 snd_delayed_lowbuf = 1;
1073 if (likely(cor_send_conndata_as_skb(nb, targetmss)))
1074 rc = _cor_flush_out_skb(trgt_out_lx, targetmss,
1075 snd_delayed_lowbuf);
1076 else
1077 rc = _cor_flush_out_conndata(trgt_out_lx, targetmss,
1078 snd_delayed_lowbuf);
1080 if (rc == RC_FLUSH_CONN_OUT_OK ||
1081 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1082 maxsend_left -= targetmss;
1083 *sent += targetmss;
1086 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1087 return RC_FLUSH_CONN_OUT_CONG;
1088 if (rc != RC_FLUSH_CONN_OUT_OK)
1089 return rc;
1092 if (trgt_out_lx->data_buf.read_remaining > 0) {
1093 __u32 len = trgt_out_lx->data_buf.read_remaining;
1094 __u64 windowlimit = cor_get_windowlimit(trgt_out_lx);
1095 int rc;
1097 if (maxsend_left < len) {
1098 if (maxsend_left >= 65536 || (
1099 maxsend_left == maxsend &&
1100 maxsend_left >= 128 &&
1101 trgt_out_lx->is_highlatency == 0 &&
1102 !maxsend_forcedelay)) {
1103 len = cor_maxsend_left_to_len(maxsend_left);
1104 } else {
1105 return RC_FLUSH_CONN_OUT_MAXSENT;
1109 if (trgt_out_lx->flush == 0 &&
1110 trgt_out_lx->sourcetype == SOURCE_SOCK &&
1111 cor_sock_sndbufavailable(trgt_out_lx) != 0)
1112 goto out;
1114 if (trgt_out_lx->flush == 0 &&
1115 trgt_out_lx->sourcetype == SOURCE_IN &&
1116 cor_srcin_buflimit_reached(trgt_out_lx)
1117 == 0 && (
1118 cor_seqno_eq(
1119 trgt_out_lx->target.out.seqno_nextsend,
1120 trgt_out_lx->target.out.seqno_acked) == 0 ||
1121 trgt_out_lx->is_highlatency != 0))
1122 goto out;
1124 if (trgt_out_lx->flush == 0 &&
1125 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
1126 cor_cpacket_write_allowed(trgt_out_lx) != 0)
1127 goto out;
1129 if (windowlimit == 0 || (windowlimit < len &&
1130 cor_seqno_eq(
1131 trgt_out_lx->target.out.seqno_nextsend,
1132 trgt_out_lx->target.out.seqno_acked) == 0)) {
1133 trgt_out_lx->target.out.windowlimit_reached = 1;
1134 snd_delayed_lowbuf = 1;
1135 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1136 goto out;
1139 if (cor_nbcongwin_send_allowed(nb) == 0)
1140 return RC_FLUSH_CONN_OUT_CONG;
1142 if (cor_seqno_low_sendlimit(trgt_out_lx, windowlimit, len)) {
1143 trgt_out_lx->target.out.windowlimit_reached = 1;
1144 snd_delayed_lowbuf = 1;
1147 if (len > windowlimit) {
1148 len = windowlimit;
1149 _cor_flush_out_ignore_lowbuf(trgt_out_lx);
1152 if (cor_send_conndata_as_skb(nb, len))
1153 rc = _cor_flush_out_skb(trgt_out_lx, len,
1154 snd_delayed_lowbuf);
1155 else
1156 rc = _cor_flush_out_conndata(trgt_out_lx, len,
1157 snd_delayed_lowbuf);
1160 if (rc == RC_FLUSH_CONN_OUT_OK ||
1161 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
1162 maxsend_left -= len;
1163 *sent += len;
1166 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
1167 return RC_FLUSH_CONN_OUT_CONG;
1168 if (rc != RC_FLUSH_CONN_OUT_OK)
1169 return rc;
1172 out:
1173 return RC_FLUSH_CONN_OUT_OK;
1176 void cor_resume_nbstalled_conns(struct work_struct *work)
1178 struct cor_neighbor *nb = container_of(work, struct cor_neighbor,
1179 stalledconn_work);
1180 int rc = RC_FLUSH_CONN_OUT_OK;
1182 spin_lock_bh(&(nb->stalledconn_lock));
1183 nb->stalledconn_work_scheduled = 0;
1184 while (rc != RC_FLUSH_CONN_OUT_NBNOTACTIVE &&
1185 list_empty(&(nb->stalledconn_list)) == 0) {
1186 struct list_head *lh = nb->stalledconn_list.next;
1187 struct cor_conn *trgt_out = container_of(lh, struct cor_conn,
1188 target.out.nbstalled_lh);
1189 __u32 sent = 0;
1190 BUG_ON(trgt_out->targettype != TARGET_OUT);
1191 list_del(lh);
1192 lh->prev = 0;
1193 lh->next = 0;
1195 spin_unlock_bh(&(nb->stalledconn_lock));
1197 spin_lock_bh(&(trgt_out->rcv_lock));
1198 if (likely(trgt_out->targettype == TARGET_OUT))
1199 rc = cor_flush_out(trgt_out, &sent);
1200 spin_unlock_bh(&(trgt_out->rcv_lock));
1202 if (sent != 0)
1203 cor_wake_sender(trgt_out);
1205 kref_put(&(trgt_out->ref), cor_free_conn);
1207 spin_lock_bh(&(nb->stalledconn_lock));
1209 spin_unlock_bh(&(nb->stalledconn_lock));
1211 kref_put(&(nb->ref), cor_neighbor_free);
1214 int __init cor_snd_init(void)
1216 cor_connretrans_slab = kmem_cache_create("cor_connretrans",
1217 sizeof(struct cor_conn_retrans), 8, 0, 0);
1218 if (unlikely(cor_connretrans_slab == 0))
1219 return -ENOMEM;
1221 return 0;
1224 void __exit cor_snd_exit2(void)
1226 kmem_cache_destroy(cor_connretrans_slab);
1227 cor_connretrans_slab = 0;
1230 MODULE_LICENSE("GPL");