full mss for high latency conns
[cor.git] / net / cor / snd.c
blob86f23aa8d87fa9b17b3e3bc658bfb7fe52f35443
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2020 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct kmem_cache *connretrans_slab;
29 static DEFINE_SPINLOCK(queues_lock);
30 static LIST_HEAD(queues);
32 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
33 int from_qos, int maxsend_forcedelay);
35 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
36 int caller, int from_nbcongwin_resume);
39 #ifdef DEBUG_QOS_SLOWSEND
40 static DEFINE_SPINLOCK(slowsend_lock);
41 static unsigned long last_send;
44 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
46 int allowsend = 0;
47 unsigned long jiffies_tmp;
48 spin_lock_bh(&slowsend_lock);
49 jiffies_tmp = jiffies;
50 if (last_send != jiffies_tmp) {
51 if (last_send + 1 == jiffies_tmp) {
52 last_send = jiffies_tmp;
53 } else {
54 last_send = jiffies_tmp - 1;
56 allowsend = 1;
58 spin_unlock_bh(&slowsend_lock);
60 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d", caller, allowsend); */
61 if (allowsend) {
62 return dev_queue_xmit(skb);
63 } else {
64 kfree_skb(skb);
65 return NET_XMIT_DROP;
68 #endif
70 static void free_connretrans(struct kref *ref)
72 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
73 struct conn *cn = cr->trgt_out_o;
75 BUG_ON(cr->state != CONN_RETRANS_ACKED);
77 kmem_cache_free(connretrans_slab, cr);
78 kref_put(&(cn->ref), free_conn);
81 void free_qos(struct kref *ref)
83 struct qos_queue *q = container_of(ref, struct qos_queue, ref);
84 kfree(q);
88 static void qos_queue_set_congstatus(struct qos_queue *q_locked);
90 /**
91 * neighbor congestion window:
92 * increment by 4096 every round trip if more that 2/3 of cwin is used
94 * in case of packet loss decrease by 1/4:
95 * - <= 1/8 immediately and
96 * - <= 1/4 during the next round trip
98 * in case of multiple packet loss events, do not decrement more than once per
99 * round trip
102 #ifdef COR_NBCONGWIN
104 /*extern __u64 get_bufspace_used(void);
106 static void print_conn_bufstats(struct neighbor *nb)
108 / * not threadsafe, but this is only for debugging... * /
109 __u64 totalsize = 0;
110 __u64 read_remaining = 0;
111 __u32 numconns = 0;
112 struct list_head *lh;
113 unsigned long iflags;
115 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
117 lh = nb->conns_waiting.lh.next;
118 while (lh != &(nb->conns_waiting.lh)) {
119 struct conn *cn = container_of(lh, struct conn,
120 target.out.rb.lh);
121 totalsize += cn->data_buf.datasize;
122 read_remaining += cn->data_buf.read_remaining;
123 lh = lh->next;
126 lh = nb->conns_waiting.lh_nextpass.next;
127 while (lh != &(nb->conns_waiting.lh_nextpass)) {
128 struct conn *cn = container_of(lh, struct conn,
129 target.out.rb.lh);
130 totalsize += cn->data_buf.datasize;
131 read_remaining += cn->data_buf.read_remaining;
132 lh = lh->next;
135 numconns = nb->conns_waiting.cnt;
137 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
139 printk(KERN_ERR "conn %llu %llu %u", totalsize, read_remaining, numconns);
140 } */
142 static void nbcongwin_data_retransmitted(struct neighbor *nb, __u64 bytes_sent)
144 __u64 min_cwin = mss_conndata(nb, 0)*2 << NBCONGWIN_SHIFT;
145 __u64 cwin;
147 unsigned long iflags;
149 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
151 cwin = atomic64_read(&(nb->nbcongwin.cwin));
153 /* printk(KERN_ERR "retrans %llu %llu", cwin >> NBCONGWIN_SHIFT,
154 get_bufspace_used());
155 print_conn_bufstats(nb); */
157 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
158 BUG_ON(cwin >= U64_MAX/1024);
160 if (bytes_sent > 1024)
161 bytes_sent = 1024;
163 if (nb->nbcongwin.cwin_shrinkto == cwin) {
164 if (bytes_sent > 512) {
165 cwin -= cwin/16;
166 } else {
167 cwin -= (bytes_sent * cwin) / (1024 * 8);
169 if (cwin < min_cwin)
170 cwin = min_cwin;
171 atomic64_set(&(nb->nbcongwin.cwin), cwin);
174 nb->nbcongwin.cwin_shrinkto -=
175 (bytes_sent * nb->nbcongwin.cwin_shrinkto) / (1024 * 8);
177 nb->nbcongwin.cwin_shrinkto = max(nb->nbcongwin.cwin_shrinkto,
178 cwin - cwin/8);
180 if (nb->nbcongwin.cwin_shrinkto < min_cwin)
181 nb->nbcongwin.cwin_shrinkto = min_cwin;
183 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
186 static __u64 nbcongwin_update_cwin(struct neighbor *nb_cwlocked,
187 __u64 data_intransit, __u64 bytes_acked)
189 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
190 __u32 INCR_PER_RTT = 4096;
192 __u64 cwin = atomic64_read(&(nb_cwlocked->nbcongwin.cwin));
194 __u64 cwin_tmp;
195 __u64 incrby;
197 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
198 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT-2));
199 if (unlikely(shrinkby > cwin))
200 cwin = 0;
201 else
202 cwin -= shrinkby;
204 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
205 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
209 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
210 goto out;
212 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
214 if (unlikely(bytes_acked >= U64_MAX/INCR_PER_RTT/CWIN_MUL))
215 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
216 cwin_tmp / CWIN_MUL / CWIN_MUL);
217 else if (unlikely(bytes_acked >=
218 U64_MAX/INCR_PER_RTT/CWIN_MUL/CWIN_MUL))
219 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
220 cwin_tmp / CWIN_MUL);
221 else
222 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
223 CWIN_MUL, cwin_tmp);
225 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
227 if (unlikely(cwin + incrby < cwin))
228 cwin = U64_MAX;
229 else
230 cwin += incrby;
232 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
233 nb_cwlocked->nbcongwin.cwin_shrinkto))
234 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
235 else
236 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
238 out:
239 atomic64_set(&(nb_cwlocked->nbcongwin.cwin), cwin);
241 return cwin;
244 void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
246 unsigned long iflags;
247 struct qos_queue *q = nb->queue;
248 __u64 data_intransit;
249 __u64 cwin;
251 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
253 data_intransit = atomic64_read(&(nb->nbcongwin.data_intransit));
255 cwin = nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
257 BUG_ON(bytes_acked > data_intransit);
258 atomic64_sub(bytes_acked, &(nb->nbcongwin.data_intransit));
259 data_intransit -= bytes_acked;
261 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
262 goto out_sendnok;
264 spin_lock(&(q->qlock));
265 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
266 if (nb->conns_waiting.cnt == 0) {
267 nb->rb.in_queue = RB_INQUEUE_FALSE;
268 } else {
269 _qos_enqueue(q, &(nb->rb), QOS_CALLER_NEIGHBOR, 1);
272 spin_unlock(&(q->qlock));
275 out_sendnok:
276 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
279 static void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
281 atomic64_add(bytes_sent, &(nb->nbcongwin.data_intransit));
284 static int nbcongwin_send_allowed(struct neighbor *nb)
286 unsigned long iflags;
287 int ret = 1;
288 struct qos_queue *q = nb->queue;
289 int krefput_queue = 0;
291 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
292 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
293 return 1;
295 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
297 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
298 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
299 goto out_ok;
301 ret = 0;
303 spin_lock(&(q->qlock));
304 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
305 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
306 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
307 list_del(&(nb->rb.lh));
308 kref_put(&(nb->ref), kreffree_bug);
309 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
310 BUG_ON(q->numconns < nb->conns_waiting.cnt);
311 q->numconns -= nb->conns_waiting.cnt;
312 q->priority_sum -= nb->conns_waiting.priority_sum;
313 krefput_queue = 1;
315 qos_queue_set_congstatus(q);
316 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
317 } else {
318 BUG();
320 spin_unlock(&(q->qlock));
322 if (krefput_queue != 0)
323 kref_put(&(q->ref), free_qos);
325 out_ok:
326 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
328 return ret;
331 #else
333 static inline void nbcongwin_data_retransmitted(struct neighbor *nb,
334 __u64 bytes_sent)
338 static inline void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
342 static inline void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
346 static inline int nbcongwin_send_allowed(struct neighbor *nb)
348 return 1;
351 #endif
353 static __u64 _resume_conns_maxsend(struct qos_queue *q, struct conn *trgt_out_l,
354 __u32 newpriority, int *maxsend_forcedelay)
356 unsigned long iflags;
358 struct neighbor *nb = trgt_out_l->target.out.nb;
359 __u32 oldpriority = trgt_out_l->target.out.rb_priority;
360 __u64 priority_sum;
361 __u32 numconns;
363 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
364 spin_lock(&(q->qlock));
366 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
367 BUG_ON(q->priority_sum < oldpriority);
368 nb->conns_waiting.priority_sum -= oldpriority;
369 q->priority_sum -= oldpriority;
371 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
372 nb->conns_waiting.priority_sum);
373 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
374 nb->conns_waiting.priority_sum += newpriority;
375 q->priority_sum += newpriority;
377 priority_sum = q->priority_sum;
378 numconns = q->numconns;
380 spin_unlock(&(q->qlock));
381 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
383 trgt_out_l->target.out.rb_priority = newpriority;
385 if (numconns <= 4) {
386 *maxsend_forcedelay = 1;
387 return div_u64(2048LL * ((__u64) newpriority) *
388 ((__u64) numconns), priority_sum);
389 } else {
390 *maxsend_forcedelay = 0;
391 return div_u64(1024LL * ((__u64) newpriority) *
392 ((__u64) numconns), priority_sum);
396 static int _resume_neighbors_nextpass(struct neighbor *nb_waitingconnslocked)
398 BUG_ON(list_empty(&(nb_waitingconnslocked->conns_waiting.lh)) == 0);
400 if (list_empty(&(nb_waitingconnslocked->conns_waiting.lh_nextpass))) {
401 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
402 return 1;
405 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
407 nb_waitingconnslocked->conns_waiting.lh.next =
408 nb_waitingconnslocked->conns_waiting.lh_nextpass.next;
409 nb_waitingconnslocked->conns_waiting.lh.prev =
410 nb_waitingconnslocked->conns_waiting.lh_nextpass.prev;
411 nb_waitingconnslocked->conns_waiting.lh.next->prev =
412 &(nb_waitingconnslocked->conns_waiting.lh);
413 nb_waitingconnslocked->conns_waiting.lh.prev->next =
414 &(nb_waitingconnslocked->conns_waiting.lh);
415 nb_waitingconnslocked->conns_waiting.lh_nextpass.next =
416 &(nb_waitingconnslocked->conns_waiting.lh_nextpass);
417 nb_waitingconnslocked->conns_waiting.lh_nextpass.prev =
418 &(nb_waitingconnslocked->conns_waiting.lh_nextpass);
420 return 0;
423 static int _resume_neighbors(struct qos_queue *q, struct neighbor *nb,
424 int *progress)
426 unsigned long iflags;
428 while (1) {
429 __u32 priority;
430 __u32 maxsend;
431 int maxsend_forcedelay;
433 int rc2;
434 __u32 sent2 = 0;
436 struct conn *cn = 0;
437 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
438 if (list_empty(&(nb->conns_waiting.lh)) != 0) {
439 int done = _resume_neighbors_nextpass(nb);
440 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
441 iflags);
442 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
444 BUG_ON(nb->conns_waiting.cnt == 0);
446 cn = container_of(nb->conns_waiting.lh.next, struct conn,
447 target.out.rb.lh);
448 BUG_ON(cn->targettype != TARGET_OUT);
449 BUG_ON(cn->target.out.rb.lh.prev != &(nb->conns_waiting.lh));
450 BUG_ON((cn->target.out.rb.lh.next == &(nb->conns_waiting.lh)) &&
451 (nb->conns_waiting.lh.prev !=
452 &(cn->target.out.rb.lh)));
453 list_del(&(cn->target.out.rb.lh));
454 list_add_tail(&(cn->target.out.rb.lh),
455 &(nb->conns_waiting.lh_nextpass));
456 kref_get(&(cn->ref));
457 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
460 priority = refresh_conn_priority(cn, 0);
462 spin_lock_bh(&(cn->rcv_lock));
464 if (unlikely(cn->targettype != TARGET_OUT)) {
465 spin_unlock_bh(&(cn->rcv_lock));
466 continue;
469 maxsend = _resume_conns_maxsend(q, cn, priority,
470 &maxsend_forcedelay);
471 if (cn->target.out.maxsend_extra >= maxsend)
472 maxsend_forcedelay = 0;
473 maxsend += cn->target.out.maxsend_extra;
474 if (unlikely(maxsend > U32_MAX))
475 maxsend = U32_MAX;
476 if (unlikely(maxsend >= 65536))
477 maxsend_forcedelay = 0;
479 rc2 = _flush_out(cn, maxsend, &sent2, 1, maxsend_forcedelay);
481 if (rc2 == RC_FLUSH_CONN_OUT_OK ||
482 rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE) {
483 cn->target.out.maxsend_extra = 0;
484 qos_remove_conn(cn);
485 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
486 rc2 == RC_FLUSH_CONN_OUT_OOM)) {
487 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
488 if (likely(cn->target.out.rb.in_queue !=
489 RB_INQUEUE_FALSE)) {
490 list_del(&(cn->target.out.rb.lh));
491 list_add(&(cn->target.out.rb.lh),
492 &(nb->conns_waiting.lh));
494 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
495 iflags);
496 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
497 rc2 == RC_FLUSH_CONN_OUT_OOM) {
498 cn->target.out.maxsend_extra = 0;
499 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
500 if (unlikely(maxsend - sent2 > 65535))
501 cn->target.out.maxsend_extra = 65535;
502 else
503 cn->target.out.maxsend_extra = maxsend - sent2;
506 spin_unlock_bh(&(cn->rcv_lock));
508 if (sent2 != 0) {
509 *progress = 1;
510 wake_sender(cn);
513 kref_put(&(cn->ref), free_conn);
515 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
516 rc2 == RC_FLUSH_CONN_OUT_OOM) {
517 return QOS_RESUME_CONG;
522 static int resume_neighbors(struct qos_queue *q, int *sent)
524 unsigned long iflags;
526 spin_lock_irqsave(&(q->qlock), iflags);
528 while (1) {
529 struct neighbor *nb;
530 int rc;
532 if (list_empty(&(q->neighbors_waiting)) != 0) {
533 BUG_ON(q->numconns != 0);
534 spin_unlock_irqrestore(&(q->qlock), iflags);
535 return QOS_RESUME_DONE;
537 BUG_ON(q->numconns == 0);
539 nb = container_of(q->neighbors_waiting.next, struct neighbor,
540 rb.lh);
542 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
543 BUG_ON(nb->rb.lh.prev != &(q->neighbors_waiting));
544 BUG_ON((nb->rb.lh.next == &(q->neighbors_waiting)) &&
545 (q->neighbors_waiting.prev != &(nb->rb.lh)));
547 kref_get(&(nb->ref));
549 spin_unlock_irqrestore(&(q->qlock), iflags);
551 atomic_set(&(nb->cmsg_delay_conndata), 1);
553 rc = _resume_neighbors(q, nb, sent);
554 if (rc == QOS_RESUME_CONG) {
555 kref_put(&(nb->ref), neighbor_free);
556 return QOS_RESUME_CONG;
559 atomic_set(&(nb->cmsg_delay_conndata), 0);
560 spin_lock_bh(&(nb->cmsg_lock));
561 schedule_controlmsg_timer(nb);
562 spin_unlock_bh(&(nb->cmsg_lock));
564 spin_lock_irqsave(&(q->qlock), iflags);
565 if (rc == QOS_RESUME_DONE) {
566 if (nb->conns_waiting.cnt == 0 &&
567 nb->rb.in_queue == RB_INQUEUE_TRUE) {
568 nb->rb.in_queue = RB_INQUEUE_FALSE;
569 list_del(&(nb->rb.lh));
570 kref_put(&(nb->ref), kreffree_bug);
572 } else if (rc == QOS_RESUME_NEXTNEIGHBOR) {
573 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
574 list_del(&(nb->rb.lh));
575 list_add_tail(&(nb->rb.lh),
576 &(q->neighbors_waiting));
578 } else {
579 BUG();
582 kref_put(&(nb->ref), neighbor_free);
584 if (rc == QOS_RESUME_NEXTNEIGHBOR) {
585 spin_unlock_irqrestore(&(q->qlock), iflags);
586 return QOS_RESUME_NEXTNEIGHBOR;
591 static int send_retrans(struct neighbor *nb, int *sent);
593 static int _qos_resume(struct qos_queue *q, int caller, int *sent)
595 unsigned long iflags;
596 int rc = QOS_RESUME_DONE;
597 struct list_head *lh;
599 spin_lock_irqsave(&(q->qlock), iflags);
601 if (caller == QOS_CALLER_KPACKET)
602 lh = &(q->kpackets_waiting);
603 else if (caller == QOS_CALLER_CONN_RETRANS)
604 lh = &(q->conn_retrans_waiting);
605 else if (caller == QOS_CALLER_ANNOUNCE)
606 lh = &(q->announce_waiting);
607 else
608 BUG();
610 while (list_empty(lh) == 0) {
611 struct resume_block *rb = container_of(lh->next,
612 struct resume_block, lh);
613 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
614 rb->in_queue = RB_INQUEUE_FALSE;
615 list_del(&(rb->lh));
617 spin_unlock_irqrestore(&(q->qlock), iflags);
618 if (caller == QOS_CALLER_KPACKET) {
619 rc = send_messages(container_of(rb, struct neighbor,
620 rb_kp), sent);
621 } else if (caller == QOS_CALLER_CONN_RETRANS) {
622 rc = send_retrans(container_of(rb, struct neighbor,
623 rb_cr), sent);
624 } else if (caller == QOS_CALLER_ANNOUNCE) {
625 rc = _send_announce(container_of(rb,
626 struct announce_data, rb), 1, sent);
627 } else {
628 BUG();
630 spin_lock_irqsave(&(q->qlock), iflags);
632 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
633 rb->in_queue = RB_INQUEUE_TRUE;
634 list_add(&(rb->lh), lh);
635 break;
638 if (caller == QOS_CALLER_KPACKET) {
639 kref_put(&(container_of(rb, struct neighbor,
640 rb_kp)->ref), neighbor_free);
641 } else if (caller == QOS_CALLER_CONN_RETRANS) {
642 kref_put(&(container_of(rb, struct neighbor,
643 rb_cr)->ref), neighbor_free);
644 } else if (caller == QOS_CALLER_ANNOUNCE) {
645 kref_put(&(container_of(rb,
646 struct announce_data, rb)->ref),
647 announce_data_free);
648 } else {
649 BUG();
652 kref_put(&(q->ref), kreffree_bug);
655 spin_unlock_irqrestore(&(q->qlock), iflags);
657 return rc;
660 void qos_resume_taskfunc(unsigned long arg)
662 struct qos_queue *q = (struct qos_queue *) arg;
664 int rc;
665 int sent = 0;
666 unsigned long iflags;
667 int i = 0;
669 #warning todo limit runtime of resume task
671 spin_lock_irqsave(&(q->qlock), iflags);
673 while (i<4) {
674 struct list_head *lh;
676 rc = QOS_RESUME_DONE;
678 if (i == QOS_CALLER_KPACKET)
679 lh = &(q->kpackets_waiting);
680 else if (i == QOS_CALLER_CONN_RETRANS)
681 lh = &(q->conn_retrans_waiting);
682 else if (i == QOS_CALLER_ANNOUNCE)
683 lh = &(q->announce_waiting);
684 else if (i == QOS_CALLER_NEIGHBOR)
685 lh = &(q->neighbors_waiting);
686 else
687 BUG();
689 if (list_empty(lh)) {
690 i++;
691 continue;
694 spin_unlock_irqrestore(&(q->qlock), iflags);
695 if (i == QOS_CALLER_NEIGHBOR) {
696 rc = resume_neighbors(q, &sent);
697 } else {
698 rc = _qos_resume(q, i, &sent);
701 spin_lock_irqsave(&(q->qlock), iflags);
703 i = 0;
705 if (rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR)
706 break;
709 if (rc == QOS_RESUME_DONE) {
710 BUG_ON(!list_empty(&(q->kpackets_waiting)));
711 BUG_ON(!list_empty(&(q->conn_retrans_waiting)));
712 BUG_ON(!list_empty(&(q->announce_waiting)));
713 BUG_ON(!list_empty(&(q->neighbors_waiting)));
715 q->qos_resume_scheduled = 0;
716 } else {
717 unsigned long jiffies_tmp = jiffies;
718 unsigned long delay = (jiffies_tmp - q->jiffies_lastprogress +
719 3) / 4;
721 if (sent || unlikely(delay <= 0)) {
722 q->jiffies_lastprogress = jiffies_tmp;
723 delay = 1;
724 } else if (delay > HZ/10) {
725 q->jiffies_lastprogress = jiffies_tmp - (HZ*4)/10;
726 delay = HZ/10;
729 /* If we retry too fast here, we might starve layer 2 */
730 if (mod_timer(&(q->qos_resume_timer), jiffies_tmp + delay) ==
731 0) {
732 kref_get(&(q->ref));
736 qos_queue_set_congstatus(q);
738 spin_unlock_irqrestore(&(q->qlock), iflags);
741 static inline int qos_queue_is_destroyed(struct qos_queue *q_locked)
743 return q_locked->dev == 0;
746 #warning todo kref (kref_put if tasklet is scheduled)
747 void qos_resume_timerfunc(struct timer_list *qos_resume_timer)
749 unsigned long iflags;
750 struct qos_queue *q = container_of(qos_resume_timer,
751 struct qos_queue, qos_resume_timer);
752 spin_lock_irqsave(&(q->qlock), iflags);
753 if (likely(!qos_queue_is_destroyed(q)))
754 tasklet_schedule(&(q->qos_resume_task));
755 spin_unlock_irqrestore(&(q->qlock), iflags);
757 kref_put(&(q->ref), free_qos);
760 struct qos_queue *get_queue(struct net_device *dev)
762 struct qos_queue *ret = 0;
763 struct list_head *curr;
765 spin_lock_bh(&(queues_lock));
766 curr = queues.next;
767 while (curr != (&queues)) {
768 struct qos_queue *q = container_of(curr,
769 struct qos_queue, queue_list);
770 if (q->dev == dev) {
771 ret = q;
772 kref_get(&(ret->ref));
773 break;
775 curr = curr->next;
777 spin_unlock_bh(&(queues_lock));
778 return ret;
781 static void _destroy_queue(struct qos_queue *q, int caller)
783 struct list_head *lh;
785 if (caller == QOS_CALLER_KPACKET)
786 lh = &(q->kpackets_waiting);
787 else if (caller == QOS_CALLER_CONN_RETRANS)
788 lh = &(q->conn_retrans_waiting);
789 else if (caller == QOS_CALLER_ANNOUNCE)
790 lh = &(q->announce_waiting);
791 else if (caller == QOS_CALLER_NEIGHBOR)
792 lh = &(q->neighbors_waiting);
793 else
794 BUG();
796 while (list_empty(lh) == 0) {
797 struct list_head *curr = lh->next;
798 struct resume_block *rb = container_of(curr,
799 struct resume_block, lh);
800 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
801 rb->in_queue = RB_INQUEUE_FALSE;
802 list_del(curr);
804 if (caller == QOS_CALLER_KPACKET) {
805 kref_put(&(container_of(rb, struct neighbor,
806 rb_kp)->ref), neighbor_free);
807 } else if (caller == QOS_CALLER_CONN_RETRANS) {
808 kref_put(&(container_of(rb, struct neighbor,
809 rb_cr)->ref), neighbor_free);
810 } else if (caller == QOS_CALLER_ANNOUNCE) {
811 kref_put(&(container_of(rb,
812 struct announce_data, rb)->ref),
813 announce_data_free);
814 } else if (caller == QOS_CALLER_NEIGHBOR) {
815 kref_put(&(container_of(rb,
816 struct neighbor, rb)->ref),
817 neighbor_free);
818 } else {
819 BUG();
821 kref_put(&(q->ref), kreffree_bug);
825 static struct qos_queue *unlink_queue(struct net_device *dev)
827 struct qos_queue *ret = 0;
828 struct list_head *curr;
830 spin_lock_bh(&(queues_lock));
831 curr = queues.next;
832 while (curr != (&queues)) {
833 struct qos_queue *q = container_of(curr,
834 struct qos_queue, queue_list);
835 if (dev == 0 || q->dev == dev) {
836 ret = q;
837 kref_get(&(ret->ref));
839 list_del(&(q->queue_list));
840 kref_put(&(q->ref), kreffree_bug);
841 break;
843 curr = curr->next;
845 spin_unlock_bh(&(queues_lock));
846 return ret;
849 int destroy_queue(struct net_device *dev)
851 int rc = 1;
852 unsigned long iflags;
854 while (1) {
855 struct qos_queue *q = unlink_queue(dev);
857 if (q == 0)
858 break;
860 rc = 0;
862 spin_lock_irqsave(&(q->qlock), iflags);
863 if (q->dev != 0) {
864 dev_put(q->dev);
865 q->dev = 0;
867 _destroy_queue(q, QOS_CALLER_KPACKET);
868 _destroy_queue(q, QOS_CALLER_CONN_RETRANS);
869 _destroy_queue(q, QOS_CALLER_ANNOUNCE);
870 _destroy_queue(q, QOS_CALLER_NEIGHBOR);
871 spin_unlock_irqrestore(&(q->qlock), iflags);
873 tasklet_kill(&(q->qos_resume_task));
875 kref_put(&(q->ref), free_qos);
878 return rc;
881 int create_queue(struct net_device *dev)
883 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
885 if (q == 0) {
886 printk(KERN_ERR "cor: unable to allocate memory for device "
887 "queue, not enabling device");
888 return 1;
891 memset(q, 0, sizeof(struct qos_queue));
893 spin_lock_init(&(q->qlock));
895 kref_init(&(q->ref));
897 q->dev = dev;
898 dev_hold(dev);
900 timer_setup(&(q->qos_resume_timer), qos_resume_timerfunc, 0);
901 tasklet_init(&(q->qos_resume_task), qos_resume_taskfunc,
902 (unsigned long) q);
904 INIT_LIST_HEAD(&(q->kpackets_waiting));
905 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
906 INIT_LIST_HEAD(&(q->announce_waiting));
907 INIT_LIST_HEAD(&(q->neighbors_waiting));
909 atomic_set(&(q->cong_status), 0);
911 spin_lock_bh(&(queues_lock));
912 list_add(&(q->queue_list), &queues);
913 spin_unlock_bh(&(queues_lock));
915 return 0;
918 static void qos_queue_set_congstatus(struct qos_queue *q_locked)
920 __u32 newstatus;
922 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ/50)) {
923 newstatus = CONGSTATUS_NONE;
924 } else if (list_empty(&(q_locked->kpackets_waiting)) == 0) {
925 newstatus = CONGSTATUS_KPACKETS;
926 } else if (list_empty(&(q_locked->conn_retrans_waiting)) == 0) {
927 newstatus = CONGSTATUS_RETRANS;
928 } else if (list_empty(&(q_locked->announce_waiting)) == 0) {
929 newstatus = CONGSTATUS_ANNOUNCE;
930 } else if (list_empty(&(q_locked->neighbors_waiting)) == 0) {
931 newstatus = CONGSTATUS_CONNDATA;
932 } else {
933 newstatus = CONGSTATUS_NONE;
936 atomic_set(&(q_locked->cong_status), newstatus);
939 void qos_set_lastdrop(struct qos_queue *q)
941 unsigned long iflags;
943 spin_lock_irqsave(&(q->qlock), iflags);
944 q->jiffies_lastdrop = jiffies;
945 qos_queue_set_congstatus(q);
946 spin_unlock_irqrestore(&(q->qlock), iflags);
950 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
951 * caller
953 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
954 int caller, int from_nbcongwin_resume)
956 int queues_empty;
958 if (rb->in_queue == RB_INQUEUE_TRUE) {
959 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
960 return;
961 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
962 from_nbcongwin_resume == 0) {
963 return;
966 if (unlikely(qos_queue_is_destroyed(q)))
967 return;
969 queues_empty = list_empty(&(q->kpackets_waiting)) &&
970 list_empty(&(q->conn_retrans_waiting)) &&
971 list_empty(&(q->announce_waiting)) &&
972 list_empty(&(q->neighbors_waiting));
974 BUG_ON(!queues_empty && q->qos_resume_scheduled == 0);
976 rb->in_queue = RB_INQUEUE_TRUE;
978 if (caller == QOS_CALLER_KPACKET) {
979 list_add(&(rb->lh), &(q->kpackets_waiting));
980 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
981 } else if (caller == QOS_CALLER_CONN_RETRANS) {
982 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
983 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
984 } else if (caller == QOS_CALLER_ANNOUNCE) {
985 list_add(&(rb->lh), &(q->announce_waiting));
986 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
987 } else if (caller == QOS_CALLER_NEIGHBOR) {
988 struct neighbor *nb = container_of(rb, struct neighbor, rb);
989 list_add(&(rb->lh), &(q->neighbors_waiting));
990 kref_get(&(nb->ref));
991 BUG_ON(nb->conns_waiting.cnt == 0);
992 q->numconns += nb->conns_waiting.cnt;
993 q->priority_sum += nb->conns_waiting.priority_sum;
994 } else {
995 BUG();
997 kref_get(&(q->ref));
999 if (q->qos_resume_scheduled == 0) {
1000 q->jiffies_lastprogress = jiffies;
1001 q->qos_resume_scheduled = 1;
1002 if (caller == QOS_CALLER_KPACKET || from_nbcongwin_resume) {
1003 tasklet_schedule(&(q->qos_resume_task));
1004 } else {
1005 if (mod_timer(&(q->qos_resume_timer), jiffies + 1) ==
1006 0) {
1007 kref_get(&(q->ref));
1012 qos_queue_set_congstatus(q);
1015 void qos_enqueue(struct qos_queue *q, struct resume_block *rb, int caller)
1017 unsigned long iflags;
1019 spin_lock_irqsave(&(q->qlock), iflags);
1020 _qos_enqueue(q, rb, caller, 0);
1021 spin_unlock_irqrestore(&(q->qlock), iflags);
1024 void qos_remove_conn(struct conn *trgt_out_lx)
1026 unsigned long iflags;
1027 struct neighbor *nb = trgt_out_lx->target.out.nb;
1028 struct qos_queue *q = nb->queue;
1029 int sched_cmsg = 0;
1030 int krefput_nb = 0;
1032 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1033 BUG_ON(q == 0);
1035 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1036 if (trgt_out_lx->target.out.rb.in_queue == RB_INQUEUE_FALSE) {
1037 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1038 return;
1040 spin_lock(&(q->qlock));
1042 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_FALSE;
1043 list_del(&(trgt_out_lx->target.out.rb.lh));
1044 BUG_ON(nb->conns_waiting.cnt == 0);
1045 nb->conns_waiting.cnt--;
1046 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1047 BUG_ON(q->numconns == 0);
1048 q->numconns--;
1051 BUG_ON(nb->conns_waiting.priority_sum <
1052 trgt_out_lx->target.out.rb_priority);
1053 BUG_ON(q->priority_sum < trgt_out_lx->target.out.rb_priority);
1054 nb->conns_waiting.priority_sum -=
1055 trgt_out_lx->target.out.rb_priority;
1056 q->priority_sum -= trgt_out_lx->target.out.rb_priority;
1057 trgt_out_lx->target.out.rb_priority = 0;
1059 if (list_empty(&(nb->conns_waiting.lh)) &&
1060 list_empty(&(nb->conns_waiting.lh_nextpass))) {
1061 BUG_ON(nb->conns_waiting.priority_sum != 0);
1062 BUG_ON(nb->conns_waiting.cnt != 0);
1063 } else {
1064 BUG_ON(nb->conns_waiting.cnt == 0);
1067 if (list_empty(&(nb->conns_waiting.lh)) &&
1068 list_empty(&(nb->conns_waiting.lh_nextpass)) &&
1069 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1070 nb->rb.in_queue = RB_INQUEUE_FALSE;
1071 list_del(&(nb->rb.lh));
1072 if (atomic_read(&(nb->cmsg_delay_conndata)) != 0) {
1073 atomic_set(&(nb->cmsg_delay_conndata), 0);
1074 sched_cmsg = 1;
1077 krefput_nb = 1;
1079 BUG_ON(list_empty(&(q->neighbors_waiting)) && q->numconns != 0);
1080 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1081 q->priority_sum != 0);
1083 qos_queue_set_congstatus(q);
1086 spin_unlock(&(q->qlock));
1087 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1089 if (sched_cmsg) {
1090 spin_lock_bh(&(nb->cmsg_lock));
1091 schedule_controlmsg_timer(nb);
1092 spin_unlock_bh(&(nb->cmsg_lock));
1095 kref_put(&(trgt_out_lx->ref), kreffree_bug);
1097 if (krefput_nb)
1098 kref_put(&(nb->ref), neighbor_free);
1101 static void qos_enqueue_conn(struct conn *trgt_out_lx)
1103 unsigned long iflags;
1104 struct neighbor *nb = trgt_out_lx->target.out.nb;
1105 struct qos_queue *q;
1107 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1109 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1111 if (trgt_out_lx->target.out.rb.in_queue != RB_INQUEUE_FALSE)
1112 goto out;
1114 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_TRUE;
1115 list_add(&(trgt_out_lx->target.out.rb.lh), &(nb->conns_waiting.lh));
1116 kref_get(&(trgt_out_lx->ref));
1117 nb->conns_waiting.cnt++;
1119 q = trgt_out_lx->target.out.nb->queue;
1120 spin_lock(&(q->qlock));
1121 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1122 q->numconns++;
1123 } else {
1124 _qos_enqueue(q, &(nb->rb), QOS_CALLER_NEIGHBOR, 0);
1126 spin_unlock(&(q->qlock));
1128 out:
1129 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1132 static struct sk_buff *create_packet(struct neighbor *nb, int size,
1133 gfp_t alloc_flags)
1135 struct sk_buff *ret;
1137 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1138 nb->dev->needed_tailroom, alloc_flags);
1139 if (unlikely(ret == 0))
1140 return 0;
1142 ret->protocol = htons(ETH_P_COR);
1143 ret->dev = nb->dev;
1145 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1146 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1147 nb->dev->dev_addr, ret->len) < 0))
1148 return 0;
1149 skb_reset_network_header(ret);
1151 return ret;
1154 struct sk_buff *create_packet_cmsg(struct neighbor *nb, int size,
1155 gfp_t alloc_flags, __u64 seqno)
1157 struct sk_buff *ret;
1158 char *dest;
1160 ret = create_packet(nb, size + 7, alloc_flags);
1161 if (unlikely(ret == 0))
1162 return 0;
1164 dest = skb_put(ret, 7);
1165 BUG_ON(dest == 0);
1167 dest[0] = PACKET_TYPE_CMSG;
1168 dest += 1;
1170 put_u48(dest, seqno);
1171 dest += 6;
1173 return ret;
1176 struct sk_buff *create_packet_conndata(struct neighbor *nb, int size,
1177 gfp_t alloc_flags, __u32 conn_id, __u64 seqno,
1178 __u8 snd_delayed_lowbuf, __u8 flush)
1180 struct sk_buff *ret;
1181 char *dest;
1183 ret = create_packet(nb, size + 11, alloc_flags);
1184 if (unlikely(ret == 0))
1185 return 0;
1187 dest = skb_put(ret, 11);
1188 BUG_ON(dest == 0);
1190 if (flush != 0) {
1191 if (snd_delayed_lowbuf != 0) {
1192 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH;
1193 } else {
1194 dest[0] = PACKET_TYPE_CONNDATA_FLUSH;
1196 } else {
1197 if (snd_delayed_lowbuf != 0) {
1198 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED;
1199 } else {
1200 dest[0] = PACKET_TYPE_CONNDATA;
1203 dest += 1;
1205 put_u32(dest, conn_id);
1206 dest += 4;
1207 put_u48(dest, seqno);
1208 dest += 6;
1210 return ret;
1213 void reschedule_conn_retrans_timer(struct neighbor *nb_retransconnlocked)
1215 struct conn_retrans *cr = 0;
1217 if (list_empty(&(nb_retransconnlocked->retrans_conn_list)))
1218 return;
1220 cr = container_of(nb_retransconnlocked->retrans_conn_list.next,
1221 struct conn_retrans, timeout_list);
1223 if (time_before_eq(cr->timeout, jiffies)) {
1224 qos_enqueue(nb_retransconnlocked->queue,
1225 &(nb_retransconnlocked->rb_cr),
1226 QOS_CALLER_CONN_RETRANS);
1227 } else {
1228 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
1229 cr->timeout) == 0) {
1230 kref_get(&(nb_retransconnlocked->ref));
1236 * warning:
1237 * caller must also call kref_get/put, see reschedule_conn_retrans_timer
1239 static void cancel_conn_retrans(struct neighbor *nb_retransconnlocked,
1240 struct conn *trgt_out_lx, struct conn_retrans *cr,
1241 __u64 *bytes_acked)
1243 if (unlikely(cr->state == CONN_RETRANS_ACKED))
1244 return;
1246 if (cr->state == CONN_RETRANS_SCHEDULED) {
1247 list_del(&(cr->timeout_list));
1248 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
1249 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
1250 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
1251 trgt_out_lx->target.out.retrans_lowwindow--;
1254 if (cr->state != CONN_RETRANS_INITIAL)
1255 *bytes_acked += cr->length;
1257 list_del(&(cr->conn_list));
1258 cr->state = CONN_RETRANS_ACKED;
1260 kref_put(&(cr->ref), free_connretrans);
1264 * nb->retrans_conn_lock must be held when calling this
1265 * (see schedule_retransmit_conn())
1267 static void cancel_acked_conn_retrans(struct conn *trgt_out_l,
1268 __u64 *bytes_acked)
1270 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
1272 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
1273 struct conn_retrans *cr = container_of(
1274 trgt_out_l->target.out.retrans_list.next,
1275 struct conn_retrans, conn_list);
1277 if (seqno_after(cr->seqno + cr->length, seqno_acked)) {
1278 if (seqno_before(cr->seqno, seqno_acked)) {
1279 *bytes_acked += seqno_clean(seqno_acked -
1280 cr->seqno);
1281 cr->length -= seqno_clean(seqno_acked -
1282 cr->seqno);
1283 cr->seqno = seqno_acked;
1285 break;
1288 cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l, cr,
1289 bytes_acked);
1292 reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
1295 void cancel_all_conn_retrans(struct conn *trgt_out_lx)
1297 struct neighbor *nb = trgt_out_lx->target.out.nb;
1298 __u64 bytes_acked = 0;
1300 spin_lock_bh(&(nb->retrans_conn_lock));
1302 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
1303 struct conn_retrans *cr = container_of(
1304 trgt_out_lx->target.out.retrans_list.next,
1305 struct conn_retrans, conn_list);
1306 BUG_ON(cr->trgt_out_o != trgt_out_lx);
1308 cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
1311 reschedule_conn_retrans_timer(nb);
1313 spin_unlock_bh(&(nb->retrans_conn_lock));
1315 if (bytes_acked > 0)
1316 nbcongwin_data_acked(nb, bytes_acked);
1319 static void cancel_all_conn_retrans_nb(struct neighbor *nb)
1321 __u64 bytes_acked = 0;
1323 while (1) {
1324 struct conn_retrans *cr;
1326 spin_lock_bh(&(nb->retrans_conn_lock));
1328 if (list_empty(&(nb->retrans_conn_list))) {
1329 spin_unlock_bh(&(nb->retrans_conn_lock));
1330 break;
1333 cr = container_of(nb->retrans_conn_list.next,
1334 struct conn_retrans, timeout_list);
1336 kref_get(&(cr->ref));
1338 spin_unlock_bh(&(nb->retrans_conn_lock));
1341 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
1342 spin_lock_bh(&(nb->retrans_conn_lock));
1344 if (likely(cr == container_of(nb->retrans_conn_list.next,
1345 struct conn_retrans, timeout_list)))
1346 cancel_conn_retrans(nb, cr->trgt_out_o, cr,
1347 &bytes_acked);
1349 spin_unlock_bh(&(nb->retrans_conn_lock));
1350 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
1352 kref_put(&(cr->ref), free_connretrans);
1355 if (bytes_acked > 0)
1356 nbcongwin_data_acked(nb, bytes_acked);
1359 static struct conn_retrans *prepare_conn_retrans(struct conn *trgt_out_l,
1360 __u64 seqno, __u32 len, __u8 snd_delayed_lowbuf,
1361 struct conn_retrans *cr_splitted, int retransconnlocked)
1363 struct neighbor *nb = trgt_out_l->target.out.nb;
1365 struct conn_retrans *cr = kmem_cache_alloc(connretrans_slab,
1366 GFP_ATOMIC);
1368 if (unlikely(cr == 0))
1369 return 0;
1371 BUG_ON(trgt_out_l->isreset != 0);
1373 memset(cr, 0, sizeof (struct conn_retrans));
1374 cr->trgt_out_o = trgt_out_l;
1375 kref_get(&(trgt_out_l->ref));
1376 cr->seqno = seqno;
1377 cr->length = len;
1378 cr->snd_delayed_lowbuf = snd_delayed_lowbuf;
1379 kref_init(&(cr->ref));
1381 kref_get(&(cr->ref));
1382 if (retransconnlocked == 0)
1383 spin_lock_bh(&(nb->retrans_conn_lock));
1385 if (cr_splitted != 0)
1386 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
1387 else
1388 list_add_tail(&(cr->conn_list),
1389 &(cr->trgt_out_o->target.out.retrans_list));
1391 if (retransconnlocked == 0)
1392 spin_unlock_bh(&(nb->retrans_conn_lock));
1394 return cr;
1397 #define RC_SENDRETRANS_OK 0
1398 #define RC_SENDRETRANS_OOM 1
1399 #define RC_SENDRETRANS_QUEUEFULL 2
1400 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
1402 static int __send_retrans(struct neighbor *nb, struct conn *trgt_out_l,
1403 struct conn_retrans *cr, __u64 *bytes_sent)
1405 __u8 flush = 0;
1407 BUG_ON(cr->length == 0);
1409 if (trgt_out_l->flush != 0 && seqno_eq(cr->seqno + cr->length,
1410 trgt_out_l->target.out.seqno_nextsend) &&
1411 trgt_out_l->data_buf.read_remaining == 0)
1412 flush = 1;
1414 if (send_conndata_as_skb(nb, cr->length)) {
1415 struct sk_buff *skb;
1416 char *dst;
1417 int rc;
1419 skb = create_packet_conndata(nb, cr->length, GFP_ATOMIC,
1420 trgt_out_l->target.out.conn_id, cr->seqno,
1421 cr->snd_delayed_lowbuf, flush);
1422 if (unlikely(skb == 0))
1423 return RC_SENDRETRANS_OOM;
1425 dst = skb_put(skb, cr->length);
1427 databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
1429 rc = cor_dev_queue_xmit(skb, nb->queue,
1430 QOS_CALLER_CONN_RETRANS);
1431 if (rc == NET_XMIT_DROP)
1432 return RC_SENDRETRANS_QUEUEFULLDROPPED;
1433 schedule_retransmit_conn(cr, 1, 0);
1434 if (rc != NET_XMIT_SUCCESS)
1435 return RC_SENDRETRANS_QUEUEFULL;
1437 } else {
1438 struct control_msg_out *cm;
1439 char *buf;
1441 buf = kmalloc(cr->length, GFP_ATOMIC);
1442 if (unlikely(buf == 0))
1443 return RC_SENDRETRANS_OOM;
1445 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
1446 if (unlikely(cm == 0)) {
1447 kfree(buf);
1448 return RC_SENDRETRANS_OOM;
1451 databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
1453 send_conndata(cm, trgt_out_l->target.out.conn_id,
1454 cr->seqno, buf, buf, cr->length,
1455 cr->snd_delayed_lowbuf, flush,
1456 trgt_out_l->is_highlatency, cr);
1459 *bytes_sent += cr->length;
1461 return RC_SENDRETRANS_OK;
1464 static int _send_retrans_splitcr_ifneeded(struct neighbor *nb_retransconnlocked,
1465 struct conn *trgt_out_l, struct conn_retrans *cr)
1467 __u32 targetmss = mss_conndata(nb_retransconnlocked,
1468 trgt_out_l->is_highlatency != 0);
1469 __u64 windowlimit = seqno_clean(
1470 trgt_out_l->target.out.seqno_windowlimit -
1471 cr->seqno);
1472 __u32 maxsize = targetmss;
1473 if (windowlimit < maxsize)
1474 maxsize = windowlimit;
1476 if (unlikely(cr->length > maxsize)) {
1477 struct conn_retrans *cr2 = prepare_conn_retrans(trgt_out_l,
1478 cr->seqno + maxsize, cr->length - maxsize,
1479 cr->snd_delayed_lowbuf, cr, 1);
1480 if (unlikely(cr2 == 0))
1481 return RC_SENDRETRANS_OOM;
1483 cr2->timeout = cr->timeout;
1485 list_add(&(cr2->timeout_list),
1486 &(nb_retransconnlocked->retrans_conn_list));
1487 cr2->state = CONN_RETRANS_SCHEDULED;
1489 cr->length = maxsize;
1492 return RC_SENDRETRANS_OK;
1495 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr,
1496 __u64 *bytes_sent)
1499 struct conn *trgt_out_o = cr->trgt_out_o;
1500 int rc = RC_SENDRETRANS_OK;
1502 spin_lock_bh(&(trgt_out_o->rcv_lock));
1504 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1505 BUG_ON(trgt_out_o->target.out.nb != nb);
1507 spin_lock_bh(&(nb->retrans_conn_lock));
1508 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1509 spin_unlock_bh(&(nb->retrans_conn_lock));
1510 goto out;
1513 BUG_ON(trgt_out_o->isreset != 0);
1515 BUG_ON(seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
1517 if (seqno_after_eq(cr->seqno,
1518 trgt_out_o->target.out.seqno_windowlimit)) {
1519 BUG_ON(cr->state != CONN_RETRANS_SENDING);
1520 cr->state = CONN_RETRANS_LOWWINDOW;
1521 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1522 trgt_out_o->target.out.retrans_lowwindow++;
1524 spin_unlock_bh(&(nb->retrans_conn_lock));
1525 goto out;
1528 rc = _send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
1530 spin_unlock_bh(&(nb->retrans_conn_lock));
1532 kref_get(&(trgt_out_o->ref));
1534 if (rc == RC_SENDRETRANS_OK)
1535 rc = __send_retrans(nb, trgt_out_o, cr, bytes_sent);
1537 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
1538 spin_lock_bh(&(nb->retrans_conn_lock));
1539 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1540 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
1541 if (rc == RC_SENDRETRANS_OOM)
1542 cr->timeout = jiffies + 1;
1543 list_add(&(cr->timeout_list), &(nb->retrans_conn_list));
1544 cr->state = CONN_RETRANS_SCHEDULED;
1545 } else {
1546 BUG();
1548 spin_unlock_bh(&(nb->retrans_conn_lock));
1551 out:
1552 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1554 kref_put(&(trgt_out_o->ref), free_conn);
1556 return (rc == RC_SENDRETRANS_OOM ||
1557 rc == RC_SENDRETRANS_QUEUEFULL ||
1558 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
1561 static int send_retrans(struct neighbor *nb, int *sent)
1563 int queuefull = 0;
1564 int nbstate = get_neigh_state(nb);
1565 __u64 bytes_sent = 0;
1567 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
1568 return QOS_RESUME_DONE;
1569 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1571 * cancel_all_conn_retrans_nb should not be needed, because
1572 * reset_all_conns calls cancel_all_conn_retrans
1574 cancel_all_conn_retrans_nb(nb);
1575 return QOS_RESUME_DONE;
1578 while (1) {
1579 struct conn_retrans *cr = 0;
1581 spin_lock_bh(&(nb->retrans_conn_lock));
1583 if (list_empty(&(nb->retrans_conn_list))) {
1584 spin_unlock_bh(&(nb->retrans_conn_lock));
1585 break;
1588 cr = container_of(nb->retrans_conn_list.next,
1589 struct conn_retrans, timeout_list);
1591 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1593 if (time_after(cr->timeout, jiffies)) {
1594 spin_unlock_bh(&(nb->retrans_conn_lock));
1595 break;
1598 kref_get(&(cr->ref));
1599 list_del(&(cr->timeout_list));
1600 cr->state = CONN_RETRANS_SENDING;
1602 spin_unlock_bh(&(nb->retrans_conn_lock));
1604 queuefull = _send_retrans(nb, cr, &bytes_sent);
1605 kref_put(&(cr->ref), free_connretrans);
1606 if (queuefull) {
1607 break;
1608 } else {
1609 *sent = 1;
1613 if (bytes_sent > 0)
1614 nbcongwin_data_retransmitted(nb, bytes_sent);
1616 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
1619 void retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
1621 struct neighbor *nb = container_of(retrans_conn_timer,
1622 struct neighbor, retrans_conn_timer);
1623 qos_enqueue(nb->queue, &(nb->rb_cr), QOS_CALLER_CONN_RETRANS);
1624 kref_put(&(nb->ref), neighbor_free);
1627 static void conn_ack_ooo_rcvd_splitcr(struct conn *trgt_out_l,
1628 struct conn_retrans *cr, __u64 seqno_ooo, __u32 length,
1629 __u64 *bytes_acked)
1631 struct conn_retrans *cr2;
1632 __u64 seqno_cr2start;
1633 __u32 oldcrlenght = cr->length;
1635 if (cr->state != CONN_RETRANS_SCHEDULED &&
1636 cr->state != CONN_RETRANS_LOWWINDOW)
1637 return;
1639 seqno_cr2start = seqno_ooo+length;
1640 cr2 = prepare_conn_retrans(trgt_out_l, seqno_cr2start,
1641 seqno_clean(cr->seqno + cr->length - seqno_cr2start),
1642 cr->snd_delayed_lowbuf, cr, 1);
1644 if (unlikely(cr2 == 0))
1645 return;
1647 BUG_ON(cr2->length > cr->length);
1649 cr2->timeout = cr->timeout;
1650 cr2->state = cr->state;
1652 if (cr->state != CONN_RETRANS_SCHEDULED)
1653 list_add(&(cr2->timeout_list), &(cr->timeout_list));
1655 BUG_ON(seqno_clean(seqno_ooo - cr->seqno) > cr->length);
1657 cr->length -= seqno_clean(seqno_ooo - cr->seqno);
1658 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
1660 *bytes_acked += length;
1663 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
1664 struct conn *trgt_out, __u64 seqno_ooo, __u32 length,
1665 __u64 *bytes_acked)
1667 struct list_head *curr;
1669 if (unlikely(length == 0))
1670 return;
1672 spin_lock_bh(&(trgt_out->rcv_lock));
1674 if (unlikely(trgt_out->targettype != TARGET_OUT))
1675 goto out;
1676 if (unlikely(trgt_out->target.out.nb != nb))
1677 goto out;
1678 if (unlikely(trgt_out->target.out.conn_id != conn_id))
1679 goto out;
1681 kref_get(&(nb->ref));
1682 spin_lock_bh(&(nb->retrans_conn_lock));
1684 curr = trgt_out->target.out.retrans_list.next;
1686 while (curr != &(trgt_out->target.out.retrans_list)) {
1687 struct conn_retrans *cr = container_of(curr,
1688 struct conn_retrans, conn_list);
1690 int ack_covers_start = seqno_after_eq(cr->seqno, seqno_ooo);
1691 int ack_covers_end = seqno_before_eq(cr->seqno + cr->length,
1692 seqno_ooo + length);
1694 curr = curr->next;
1696 if (seqno_before(cr->seqno + cr->length, seqno_ooo))
1697 continue;
1699 if (seqno_after(cr->seqno, seqno_ooo + length))
1700 break;
1702 if (likely(ack_covers_start && ack_covers_end)) {
1703 cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
1704 reschedule_conn_retrans_timer(nb);
1705 } else if (ack_covers_start) {
1706 __u32 diff = seqno_ooo + length - cr->seqno -
1707 cr->length;
1708 BUG_ON(diff >= cr->length);
1709 cr->seqno += diff;
1710 cr->length -= diff;
1711 *bytes_acked =+ diff;
1712 } else if (ack_covers_end) {
1713 __u32 diff = seqno_ooo + length - cr->seqno;
1714 BUG_ON(diff >= length);
1715 cr->length -= diff;
1716 *bytes_acked += diff;
1717 } else {
1718 conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
1719 length, bytes_acked);
1720 break;
1724 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
1725 trgt_out->target.out.seqno_acked =
1726 trgt_out->target.out.seqno_nextsend;
1727 } else {
1728 struct conn_retrans *cr = container_of(
1729 trgt_out->target.out.retrans_list.next,
1730 struct conn_retrans, conn_list);
1731 if (seqno_after(cr->seqno, trgt_out->target.out.seqno_acked))
1732 trgt_out->target.out.seqno_acked = cr->seqno;
1735 spin_unlock_bh(&(nb->retrans_conn_lock));
1736 kref_put(&(nb->ref), neighbor_free);
1738 out:
1739 spin_unlock_bh(&(trgt_out->rcv_lock));
1742 static void _conn_ack_rcvd_nosendwin(struct conn *trgt_out_l)
1744 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
1745 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
1746 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
1748 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
1749 trgt_out_l->bufsize.act.noact.bytesleft = max(
1750 trgt_out_l->bufsize.act.noact.bytesleft,
1751 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1753 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
1754 trgt_out_l->bufsize.ignore_rcv_lowbuf,
1755 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1759 * nb->retrans_conn_lock must be held when calling this
1760 * (see schedule_retransmit_conn())
1762 static void reschedule_lowwindow_retrans(struct conn *trgt_out_l)
1764 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
1765 int cnt = 0;
1767 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
1768 struct conn_retrans *cr;
1770 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
1771 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
1772 65535);
1773 trgt_out_l->target.out.retrans_lowwindow = 0;
1774 break;
1777 cr = container_of(lh, struct conn_retrans, conn_list);
1779 if (seqno_after_eq(cr->seqno,
1780 trgt_out_l->target.out.seqno_windowlimit)) {
1781 break;
1784 if (cr->state == CONN_RETRANS_LOWWINDOW)
1785 schedule_retransmit_conn(cr, 1, 1);
1787 lh = lh->next;
1788 cnt++;
1792 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
1793 __u64 seqno, int setwindow, __u8 window, __u64 *bytes_acked)
1795 int seqno_advanced = 0;
1796 int window_enlarged = 0;
1798 spin_lock_bh(&(trgt_out->rcv_lock));
1800 if (unlikely(trgt_out->isreset != 0))
1801 goto out;
1802 if (unlikely(trgt_out->targettype != TARGET_OUT))
1803 goto out;
1804 if (unlikely(trgt_out->target.out.nb != nb))
1805 goto out;
1806 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
1807 goto out;
1809 if (unlikely(seqno_after(seqno, trgt_out->target.out.seqno_nextsend) ||
1810 seqno_before(seqno, trgt_out->target.out.seqno_acked)))
1811 goto out;
1813 if (setwindow) {
1814 __u64 windowdec = dec_log_64_7(window);
1815 if (likely(seqno_after(seqno,
1816 trgt_out->target.out.seqno_acked)) ||
1817 seqno_after(seqno + windowdec,
1818 trgt_out->target.out.seqno_windowlimit)) {
1819 trgt_out->target.out.seqno_windowlimit = seqno +
1820 windowdec;
1821 window_enlarged = 1;
1825 if (seqno_after(seqno, trgt_out->target.out.seqno_acked))
1826 seqno_advanced = 1;
1828 if (seqno_advanced == 0 && window_enlarged == 0)
1829 goto out;
1831 kref_get(&(nb->ref));
1832 spin_lock_bh(&(nb->retrans_conn_lock));
1834 if (seqno_advanced) {
1835 trgt_out->target.out.seqno_acked = seqno;
1836 cancel_acked_conn_retrans(trgt_out, bytes_acked);
1839 if (window_enlarged)
1840 reschedule_lowwindow_retrans(trgt_out);
1842 spin_unlock_bh(&(nb->retrans_conn_lock));
1843 kref_put(&(nb->ref), neighbor_free);
1845 if (seqno_advanced)
1846 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
1848 if (seqno_eq(trgt_out->target.out.seqno_acked,
1849 trgt_out->target.out.seqno_nextsend))
1850 _conn_ack_rcvd_nosendwin(trgt_out);
1852 out:
1853 if (seqno_advanced || window_enlarged)
1854 flush_buf(trgt_out);
1856 spin_unlock_bh(&(trgt_out->rcv_lock));
1858 wake_sender(trgt_out);
1861 static void try_combine_conn_retrans_prev(struct neighbor *nb_retransconnlocked,
1862 struct conn *trgt_out_lx, struct conn_retrans *cr)
1864 struct conn_retrans *cr_prev;
1865 __u64 bytes_dummyacked = 0;
1867 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1869 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
1870 return;
1872 cr_prev = container_of(cr->conn_list.prev, struct conn_retrans,
1873 conn_list);
1875 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
1876 return;
1877 if (cr_prev->timeout != cr->timeout)
1878 return;
1879 if (!seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
1880 return;
1882 cr->seqno -= cr_prev->length;
1883 cr->length += cr_prev->length;
1885 cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
1886 &bytes_dummyacked);
1889 static void try_combine_conn_retrans_next(struct neighbor *nb_retranslocked,
1890 struct conn *trgt_out_lx, struct conn_retrans *cr)
1892 struct conn_retrans *cr_next;
1893 __u64 bytes_dummyacked = 0;
1895 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1897 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
1898 return;
1900 cr_next = container_of(cr->conn_list.next, struct conn_retrans,
1901 conn_list);
1903 if (cr_next->state != CONN_RETRANS_SCHEDULED)
1904 return;
1905 if (cr_next->timeout != cr->timeout)
1906 return;
1907 if (!seqno_eq(cr->seqno + cr->length, cr_next->seqno))
1908 return;
1910 cr->length += cr_next->length;
1912 cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
1913 &bytes_dummyacked);
1916 void schedule_retransmit_conn(struct conn_retrans *cr, int connlocked,
1917 int nbretransconn_locked)
1919 struct conn *trgt_out_o = cr->trgt_out_o;
1920 struct neighbor *nb;
1921 int first;
1923 if (connlocked == 0)
1924 spin_lock_bh(&(trgt_out_o->rcv_lock));
1926 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1927 nb = trgt_out_o->target.out.nb;
1929 cr->timeout = calc_timeout(atomic_read(&(nb->latency_retrans_us)),
1930 atomic_read(&(nb->latency_stddev_retrans_us)),
1931 atomic_read(&(nb->max_remote_ackconn_delay_us)));
1933 if (nbretransconn_locked == 0)
1934 spin_lock_bh(&(nb->retrans_conn_lock));
1936 kref_get(&(nb->ref));
1938 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
1940 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1941 goto out;
1942 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
1943 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
1944 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1945 trgt_out_o->target.out.retrans_lowwindow--;
1948 first = unlikely(list_empty(&(nb->retrans_conn_list)));
1949 list_add_tail(&(cr->timeout_list), &(nb->retrans_conn_list));
1950 cr->state = CONN_RETRANS_SCHEDULED;
1952 if (unlikely(first)) {
1953 reschedule_conn_retrans_timer(nb);
1954 } else {
1955 try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
1956 try_combine_conn_retrans_next(nb, trgt_out_o, cr);
1959 out:
1960 if (nbretransconn_locked == 0)
1961 spin_unlock_bh(&(nb->retrans_conn_lock));
1963 kref_put(&(nb->ref), neighbor_free);
1965 if (connlocked == 0)
1966 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1969 static int _flush_out_skb(struct conn *trgt_out_lx, __u32 len,
1970 __u8 snd_delayed_lowbuf)
1972 struct neighbor *nb = trgt_out_lx->target.out.nb;
1974 __u64 seqno;
1975 struct conn_retrans *cr;
1976 struct sk_buff *skb;
1977 char *dst;
1978 __u8 flush;
1979 int rc;
1981 if (trgt_out_lx->flush != 0 &&
1982 trgt_out_lx->data_buf.read_remaining == len)
1983 flush = 1;
1985 seqno = trgt_out_lx->target.out.seqno_nextsend;
1986 skb = create_packet_conndata(trgt_out_lx->target.out.nb, len,
1987 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
1988 snd_delayed_lowbuf, flush);
1989 if (unlikely(skb == 0))
1990 return RC_FLUSH_CONN_OUT_OOM;
1992 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
1993 0, 0);
1994 if (unlikely(cr == 0)) {
1995 kfree_skb(skb);
1996 return RC_FLUSH_CONN_OUT_OOM;
1999 dst = skb_put(skb, len);
2001 databuf_pull(trgt_out_lx, dst, len);
2003 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
2004 if (rc == NET_XMIT_DROP) {
2005 databuf_unpull(trgt_out_lx, len);
2006 spin_lock_bh(&(nb->retrans_conn_lock));
2007 cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
2008 spin_unlock_bh(&(nb->retrans_conn_lock));
2009 kref_put(&(cr->ref), free_connretrans);
2010 return RC_FLUSH_CONN_OUT_CONG;
2013 trgt_out_lx->target.out.seqno_nextsend += len;
2014 nbcongwin_data_sent(nb, len);
2015 schedule_retransmit_conn(cr, 1, 0);
2016 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2017 update_src_sock_sndspeed(trgt_out_lx, len);
2019 kref_put(&(cr->ref), free_connretrans);
2021 return (rc == NET_XMIT_SUCCESS) ?
2022 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
2025 static int _flush_out_conndata(struct conn *trgt_out_lx, __u32 len,
2026 __u8 snd_delayed_lowbuf)
2028 __u64 seqno;
2029 struct control_msg_out *cm;
2030 struct conn_retrans *cr;
2031 char *buf;
2032 __u8 flush = 0;
2034 if (trgt_out_lx->flush != 0 &&
2035 trgt_out_lx->data_buf.read_remaining == len)
2036 flush = 1;
2038 buf = kmalloc(len, GFP_ATOMIC);
2040 if (unlikely(buf == 0))
2041 return RC_FLUSH_CONN_OUT_OOM;
2043 cm = alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
2044 if (unlikely(cm == 0)) {
2045 kfree(buf);
2046 return RC_FLUSH_CONN_OUT_OOM;
2049 seqno = trgt_out_lx->target.out.seqno_nextsend;
2051 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2052 0, 0);
2053 if (unlikely(cr == 0)) {
2054 kfree(buf);
2055 free_control_msg(cm);
2056 return RC_FLUSH_CONN_OUT_OOM;
2059 databuf_pull(trgt_out_lx, buf, len);
2060 trgt_out_lx->target.out.seqno_nextsend += len;
2061 nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
2062 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2063 update_src_sock_sndspeed(trgt_out_lx, len);
2065 send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf, len,
2066 snd_delayed_lowbuf, flush, trgt_out_lx->is_highlatency,
2067 cr);
2069 return RC_FLUSH_CONN_OUT_OK;
2072 int srcin_buflimit_reached(struct conn *src_in_lx)
2074 __u64 window_left;
2076 if (unlikely(seqno_before(src_in_lx->source.in.window_seqnolimit,
2077 src_in_lx->source.in.next_seqno)))
2078 return 1;
2080 window_left = seqno_clean(src_in_lx->source.in.window_seqnolimit -
2081 src_in_lx->source.in.next_seqno);
2083 if (window_left < WINDOW_ENCODE_MIN)
2084 return 1;
2086 if (window_left/2 < src_in_lx->data_buf.read_remaining)
2087 return 1;
2089 return 0;
2092 static __u32 maxsend_left_to_len(__u32 maxsend_left)
2094 __u32 i;
2095 if (maxsend_left < 128)
2096 return maxsend_left;
2098 for (i=128;i<4096;) {
2099 if (i*2 > maxsend_left)
2100 return i;
2101 i = i*2;
2104 return maxsend_left - maxsend_left%4096;
2107 static int seqno_low_sendlimit(struct conn *trgt_out_lx, __u64 windowlimit,
2108 __u32 sndlen)
2110 __u64 bytes_ackpending;
2112 BUG_ON(seqno_before(trgt_out_lx->target.out.seqno_nextsend,
2113 trgt_out_lx->target.out.seqno_acked));
2115 bytes_ackpending = seqno_clean(trgt_out_lx->target.out.seqno_nextsend -
2116 trgt_out_lx->target.out.seqno_acked);
2118 if (windowlimit <= sndlen)
2119 return 1;
2121 if (unlikely(bytes_ackpending + sndlen < bytes_ackpending))
2122 return 0;
2124 if (trgt_out_lx->is_highlatency != 0)
2125 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 4)
2126 ? 1 : 0;
2127 else
2128 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 8)
2129 ? 1 : 0;
2132 static void _flush_out_ignore_lowbuf(struct conn *trgt_out_lx)
2134 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
2135 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
2136 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
2139 static __u64 get_windowlimit(struct conn *trgt_out_lx)
2141 if (unlikely(seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
2142 trgt_out_lx->target.out.seqno_nextsend)))
2143 return 0;
2145 return seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
2146 trgt_out_lx->target.out.seqno_nextsend);
2149 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
2150 int from_qos, int maxsend_forcedelay)
2152 struct neighbor *nb = trgt_out_lx->target.out.nb;
2154 __u32 targetmss;
2156 int nbstate;
2158 __u8 snd_delayed_lowbuf = trgt_out_lx->target.out.windowlimit_reached;
2160 __u32 maxsend_left = maxsend;
2162 trgt_out_lx->target.out.windowlimit_reached = 0;
2164 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
2166 if (unlikely(trgt_out_lx->target.out.established == 0))
2167 return RC_FLUSH_CONN_OUT_OK;
2169 if (unlikely(trgt_out_lx->isreset != 0))
2170 return RC_FLUSH_CONN_OUT_OK;
2172 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
2174 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
2175 return RC_FLUSH_CONN_OUT_OK;
2177 #warning todo burst queue
2178 if (from_qos == 0 && qos_fastsend_allowed_conn(trgt_out_lx) == 0)
2179 return RC_FLUSH_CONN_OUT_CONG;
2181 spin_lock_bh(&(nb->stalledconn_lock));
2182 nbstate = get_neigh_state(nb);
2183 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
2184 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev == 0 &&
2185 trgt_out_lx->target.out.nbstalled_lh.next != 0);
2186 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev != 0 &&
2187 trgt_out_lx->target.out.nbstalled_lh.next == 0);
2189 if (trgt_out_lx->target.out.nbstalled_lh.prev == 0) {
2190 kref_get(&(trgt_out_lx->ref));
2191 list_add_tail(&(trgt_out_lx->target.out.nbstalled_lh),
2192 &(nb->stalledconn_list));
2195 spin_unlock_bh(&(nb->stalledconn_lock));
2197 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
2198 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
2200 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
2201 get_windowlimit(trgt_out_l),
2202 trgt_out_l->data_buf.read_remaining); */
2204 targetmss = mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
2206 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
2207 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2208 int rc;
2210 if (maxsend_left < targetmss)
2211 break;
2213 if (windowlimit < targetmss) {
2214 trgt_out_lx->target.out.windowlimit_reached = 1;
2215 snd_delayed_lowbuf = 1;
2216 _flush_out_ignore_lowbuf(trgt_out_lx);
2217 break;
2220 if (nbcongwin_send_allowed(nb) == 0)
2221 return RC_FLUSH_CONN_OUT_CONG;
2223 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, targetmss)) {
2224 trgt_out_lx->target.out.windowlimit_reached = 1;
2225 snd_delayed_lowbuf = 1;
2228 if (likely(send_conndata_as_skb(nb, targetmss)))
2229 rc = _flush_out_skb(trgt_out_lx, targetmss,
2230 snd_delayed_lowbuf);
2231 else
2232 rc = _flush_out_conndata(trgt_out_lx, targetmss,
2233 snd_delayed_lowbuf);
2235 if (rc == RC_FLUSH_CONN_OUT_OK ||
2236 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2237 maxsend_left -= targetmss;
2238 *sent += targetmss;
2241 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2242 return RC_FLUSH_CONN_OUT_CONG;
2243 if (rc != RC_FLUSH_CONN_OUT_OK)
2244 return rc;
2247 if (trgt_out_lx->data_buf.read_remaining > 0) {
2248 __u32 len = trgt_out_lx->data_buf.read_remaining;
2249 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2250 int rc;
2252 if (maxsend_left < len) {
2253 if (maxsend_left == maxsend && maxsend_left >= 128 &&
2254 trgt_out_lx->is_highlatency == 0 &&
2255 !maxsend_forcedelay) {
2256 len = maxsend_left_to_len(maxsend_left);
2257 } else {
2258 return RC_FLUSH_CONN_OUT_MAXSENT;
2262 if (trgt_out_lx->flush == 0 &&
2263 trgt_out_lx->sourcetype == SOURCE_SOCK &&
2264 cor_sock_sndbufavailable(trgt_out_lx) != 0)
2265 goto out;
2267 if (trgt_out_lx->flush == 0 &&
2268 trgt_out_lx->sourcetype == SOURCE_IN &&
2269 srcin_buflimit_reached(trgt_out_lx)
2270 == 0 && (
2271 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2272 trgt_out_lx->target.out.seqno_acked) == 0 ||
2273 trgt_out_lx->is_highlatency != 0))
2274 goto out;
2276 if (trgt_out_lx->flush == 0 &&
2277 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
2278 cpacket_write_allowed(trgt_out_lx) != 0)
2279 goto out;
2281 if (windowlimit == 0 || (windowlimit < len &&
2282 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2283 trgt_out_lx->target.out.seqno_acked) == 0)) {
2284 trgt_out_lx->target.out.windowlimit_reached = 1;
2285 snd_delayed_lowbuf = 1;
2286 _flush_out_ignore_lowbuf(trgt_out_lx);
2287 goto out;
2290 if (nbcongwin_send_allowed(nb) == 0)
2291 return RC_FLUSH_CONN_OUT_CONG;
2293 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, len)) {
2294 trgt_out_lx->target.out.windowlimit_reached = 1;
2295 snd_delayed_lowbuf = 1;
2298 if (len > windowlimit) {
2299 len = windowlimit;
2300 _flush_out_ignore_lowbuf(trgt_out_lx);
2303 if (send_conndata_as_skb(nb, len))
2304 rc = _flush_out_skb(trgt_out_lx, len,
2305 snd_delayed_lowbuf);
2306 else
2307 rc = _flush_out_conndata(trgt_out_lx, len,
2308 snd_delayed_lowbuf);
2311 if (rc == RC_FLUSH_CONN_OUT_OK ||
2312 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2313 maxsend_left -= len;
2314 *sent += len;
2317 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2318 return RC_FLUSH_CONN_OUT_CONG;
2319 if (rc != RC_FLUSH_CONN_OUT_OK)
2320 return rc;
2323 out:
2324 return RC_FLUSH_CONN_OUT_OK;
2327 int flush_out(struct conn *trgt_out_lx, __u32 *sent)
2329 int rc = _flush_out(trgt_out_lx, 1 << 30, sent, 0, 0);
2331 if (rc == RC_FLUSH_CONN_OUT_CONG || rc == RC_FLUSH_CONN_OUT_MAXSENT ||
2332 rc == RC_FLUSH_CONN_OUT_OOM)
2333 qos_enqueue_conn(trgt_out_lx);
2335 return rc;
2338 void resume_nbstalled_conns(struct work_struct *work)
2340 struct neighbor *nb = container_of(work, struct neighbor,
2341 stalledconn_work);
2342 int rc = RC_FLUSH_CONN_OUT_OK;
2344 spin_lock_bh(&(nb->stalledconn_lock));
2345 nb->stalledconn_work_scheduled = 0;
2346 while (rc != RC_FLUSH_CONN_OUT_NBNOTACTIVE &&
2347 list_empty(&(nb->stalledconn_list)) == 0) {
2348 struct list_head *lh = nb->stalledconn_list.next;
2349 struct conn *trgt_out = container_of(lh, struct conn,
2350 target.out.nbstalled_lh);
2351 __u32 sent = 0;
2352 BUG_ON(trgt_out->targettype != TARGET_OUT);
2353 list_del(lh);
2354 lh->prev = 0;
2355 lh->next = 0;
2357 spin_unlock_bh(&(nb->stalledconn_lock));
2359 spin_lock_bh(&(trgt_out->rcv_lock));
2360 if (likely(trgt_out->targettype == TARGET_OUT))
2361 rc = flush_out(trgt_out, &sent);
2362 spin_unlock_bh(&(trgt_out->rcv_lock));
2364 if (sent != 0)
2365 wake_sender(trgt_out);
2367 kref_put(&(trgt_out->ref), free_conn);
2369 spin_lock_bh(&(nb->stalledconn_lock));
2371 spin_unlock_bh(&(nb->stalledconn_lock));
2373 kref_put(&(nb->ref), neighbor_free);
2376 int __init cor_snd_init(void)
2378 connretrans_slab = kmem_cache_create("cor_connretrans",
2379 sizeof(struct conn_retrans), 8, 0, 0);
2380 if (unlikely(connretrans_slab == 0))
2381 return -ENOMEM;
2383 return 0;
2386 MODULE_LICENSE("GPL");