qos_resume nbcongwin fix
[cor.git] / net / cor / snd.c
blob59b1b1f9a028d620d774c3cef9786846f68b0cf1
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2019 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct kmem_cache *connretrans_slab;
29 static DEFINE_SPINLOCK(queues_lock);
30 static LIST_HEAD(queues);
32 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
33 int from_qos);
35 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
36 int caller, int from_nbcongwin_resume);
39 #ifdef DEBUG_QOS_SLOWSEND
40 static DEFINE_SPINLOCK(slowsend_lock);
41 static unsigned long last_send;
44 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
46 int allowsend = 0;
47 unsigned long jiffies_tmp;
48 spin_lock_bh(&slowsend_lock);
49 jiffies_tmp = jiffies;
50 if (last_send != jiffies_tmp) {
51 if (last_send + 1 == jiffies_tmp) {
52 last_send = jiffies_tmp;
53 } else {
54 last_send = jiffies_tmp - 1;
56 allowsend = 1;
58 spin_unlock_bh(&slowsend_lock);
60 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d", caller, allowsend); */
61 if (allowsend) {
62 return dev_queue_xmit(skb);
63 } else {
64 kfree_skb(skb);
65 return NET_XMIT_DROP;
68 #endif
70 static void free_connretrans(struct kref *ref)
72 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
73 struct conn *cn = cr->trgt_out_o;
75 BUG_ON(cr->state != CONN_RETRANS_ACKED);
77 kmem_cache_free(connretrans_slab, cr);
78 kref_put(&(cn->ref), free_conn);
81 void free_qos(struct kref *ref)
83 struct qos_queue *q = container_of(ref, struct qos_queue, ref);
84 kfree(q);
88 static void qos_queue_set_congstatus(struct qos_queue *q_locked);
90 /**
91 * neighbor congestion window:
92 * increment by 4096 every round trip if more that 2/3 of cwin is used
94 * in case of packet loss decrease by 1/4:
95 * - <= 1/8 immediately and
96 * - <= 1/4 during the next round trip
98 * in case of multiple packet loss events, do not decrement more than once per
99 * round trip
102 #ifdef COR_NBCONGWIN
104 /*extern __u64 get_bufspace_used(void);
106 static void print_conn_bufstats(struct neighbor *nb)
108 / * not threadsafe, but this is only for debugging... * /
109 __u64 totalsize = 0;
110 __u64 read_remaining = 0;
111 __u32 numconns = 0;
112 struct list_head *lh;
113 unsigned long iflags;
115 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
117 lh = nb->conns_waiting.lh.next;
118 while (lh != &(nb->conns_waiting.lh)) {
119 struct conn *cn = container_of(lh, struct conn,
120 target.out.rb.lh);
121 totalsize += cn->data_buf.datasize;
122 read_remaining += cn->data_buf.read_remaining;
123 lh = lh->next;
126 lh = nb->conns_waiting.lh_nextpass.next;
127 while (lh != &(nb->conns_waiting.lh_nextpass)) {
128 struct conn *cn = container_of(lh, struct conn,
129 target.out.rb.lh);
130 totalsize += cn->data_buf.datasize;
131 read_remaining += cn->data_buf.read_remaining;
132 lh = lh->next;
135 numconns = nb->conns_waiting.cnt;
137 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
139 printk(KERN_ERR "conn %llu %llu %u", totalsize, read_remaining, numconns);
140 } */
142 static void nbcongwin_data_retransmitted(struct neighbor *nb, __u64 bytes_sent)
144 __u64 cwin;
146 unsigned long iflags;
148 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
150 cwin = atomic64_read(&(nb->nbcongwin.cwin));
152 /* printk(KERN_ERR "retrans %llu %llu", cwin >> NBCONGWIN_SHIFT,
153 get_bufspace_used());
154 print_conn_bufstats(nb); */
156 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
157 BUG_ON(cwin >= U64_MAX/1024);
159 if (bytes_sent > 1024)
160 bytes_sent = 1024;
162 if (nb->nbcongwin.cwin_shrinkto == cwin) {
163 if (bytes_sent > 512) {
164 cwin -= cwin/8;
165 } else {
166 cwin -= (bytes_sent * cwin) / (1024 * 4);
168 atomic64_set(&(nb->nbcongwin.cwin), cwin);
171 nb->nbcongwin.cwin_shrinkto -=
172 (bytes_sent * nb->nbcongwin.cwin_shrinkto) / (1024 * 4);
174 nb->nbcongwin.cwin_shrinkto = max(nb->nbcongwin.cwin_shrinkto,
175 cwin - cwin/4);
177 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
180 static __u64 nbcongwin_update_cwin(struct neighbor *nb_cwlocked,
181 __u64 data_intransit, __u64 bytes_acked)
183 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
184 __u32 INCR_PER_RTT = 4096;
186 __u64 cwin = atomic64_read(&(nb_cwlocked->nbcongwin.cwin));
188 __u64 cwin_tmp;
189 __u64 incrby;
191 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
192 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT-2));
193 if (unlikely(shrinkby > cwin))
194 cwin = 0;
195 else
196 cwin -= shrinkby;
198 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
199 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
203 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
204 goto out;
206 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
208 if (unlikely(bytes_acked >= U64_MAX/INCR_PER_RTT/CWIN_MUL))
209 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
210 cwin_tmp / CWIN_MUL / CWIN_MUL);
211 else if (unlikely(bytes_acked >=
212 U64_MAX/INCR_PER_RTT/CWIN_MUL/CWIN_MUL))
213 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
214 cwin_tmp / CWIN_MUL);
215 else
216 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
217 CWIN_MUL, cwin_tmp);
219 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
221 if (unlikely(cwin + incrby < cwin))
222 cwin = U64_MAX;
223 else
224 cwin += incrby;
226 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
227 nb_cwlocked->nbcongwin.cwin_shrinkto))
228 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
229 else
230 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
232 out:
233 atomic64_set(&(nb_cwlocked->nbcongwin.cwin), cwin);
235 return cwin;
238 void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
240 unsigned long iflags;
241 struct qos_queue *q = nb->queue;
242 __u64 data_intransit;
243 __u64 cwin;
245 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
247 data_intransit = atomic64_read(&(nb->nbcongwin.data_intransit));
249 cwin = nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
251 BUG_ON(bytes_acked > data_intransit);
252 atomic64_sub(bytes_acked, &(nb->nbcongwin.data_intransit));
253 data_intransit -= bytes_acked;
255 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
256 goto out_sendnok;
258 spin_lock(&(q->qlock));
259 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
260 if (nb->conns_waiting.cnt == 0) {
261 nb->rb.in_queue = RB_INQUEUE_FALSE;
262 } else {
263 _qos_enqueue(q, &(nb->rb), QOS_CALLER_NEIGHBOR, 1);
266 spin_unlock(&(q->qlock));
269 out_sendnok:
270 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
273 static void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
275 atomic64_add(bytes_sent, &(nb->nbcongwin.data_intransit));
278 #warning todo do not shrink below mss
279 static int nbcongwin_send_allowed(struct neighbor *nb)
281 unsigned long iflags;
282 int ret = 1;
283 struct qos_queue *q = nb->queue;
284 int krefput_queue = 0;
286 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
287 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
288 return 1;
290 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
292 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
293 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
294 goto out_ok;
296 ret = 0;
298 spin_lock(&(q->qlock));
299 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
300 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
301 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
302 list_del(&(nb->rb.lh));
303 kref_put(&(nb->ref), kreffree_bug);
304 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
305 BUG_ON(q->numconns < nb->conns_waiting.cnt);
306 q->numconns -= nb->conns_waiting.cnt;
307 q->priority_sum -= nb->conns_waiting.priority_sum;
308 krefput_queue = 1;
310 qos_queue_set_congstatus(q);
311 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
312 } else {
313 BUG();
315 spin_unlock(&(q->qlock));
317 if (krefput_queue != 0)
318 kref_put(&(q->ref), free_qos);
320 out_ok:
321 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
323 return ret;
326 #else
328 static inline void nbcongwin_data_retransmitted(struct neighbor *nb,
329 __u64 bytes_sent)
333 static inline void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
337 static inline void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
341 static inline int nbcongwin_send_allowed(struct neighbor *nb)
343 return 1;
346 #endif
348 static __u64 _resume_conns_maxsend(struct qos_queue *q, struct conn *trgt_out_l,
349 __u32 newpriority)
351 unsigned long iflags;
353 struct neighbor *nb = trgt_out_l->target.out.nb;
354 __u32 oldpriority = trgt_out_l->target.out.rb_priority;
355 __u64 priority_sum;
356 __u32 numconns;
358 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
359 spin_lock(&(q->qlock));
361 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
362 BUG_ON(q->priority_sum < oldpriority);
363 nb->conns_waiting.priority_sum -= oldpriority;
364 q->priority_sum -= oldpriority;
366 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
367 nb->conns_waiting.priority_sum);
368 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
369 nb->conns_waiting.priority_sum += newpriority;
370 q->priority_sum += newpriority;
372 priority_sum = q->priority_sum;
373 numconns = q->numconns;
375 spin_unlock(&(q->qlock));
376 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
378 trgt_out_l->target.out.rb_priority = newpriority;
380 return div_u64(1024LL * ((__u64) newpriority) * ((__u64) numconns),
381 priority_sum);
384 static int _resume_neighbors_nextpass(struct neighbor *nb_waitingconnslocked)
386 BUG_ON(list_empty(&(nb_waitingconnslocked->conns_waiting.lh)) == 0);
388 if (list_empty(&(nb_waitingconnslocked->conns_waiting.lh_nextpass))) {
389 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
390 return 1;
393 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
395 nb_waitingconnslocked->conns_waiting.lh.next =
396 nb_waitingconnslocked->conns_waiting.lh_nextpass.next;
397 nb_waitingconnslocked->conns_waiting.lh.prev =
398 nb_waitingconnslocked->conns_waiting.lh_nextpass.prev;
399 nb_waitingconnslocked->conns_waiting.lh.next->prev =
400 &(nb_waitingconnslocked->conns_waiting.lh);
401 nb_waitingconnslocked->conns_waiting.lh.prev->next =
402 &(nb_waitingconnslocked->conns_waiting.lh);
403 nb_waitingconnslocked->conns_waiting.lh_nextpass.next =
404 &(nb_waitingconnslocked->conns_waiting.lh_nextpass);
405 nb_waitingconnslocked->conns_waiting.lh_nextpass.prev =
406 &(nb_waitingconnslocked->conns_waiting.lh_nextpass);
408 return 0;
411 static int _resume_neighbors(struct qos_queue *q, struct neighbor *nb,
412 int *progress)
414 unsigned long iflags;
416 while (1) {
417 __u32 priority;
418 __u32 maxsend;
420 int rc2;
421 __u32 sent2 = 0;
423 struct conn *cn = 0;
424 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
425 if (list_empty(&(nb->conns_waiting.lh)) != 0) {
426 int done = _resume_neighbors_nextpass(nb);
427 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
428 iflags);
429 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
431 BUG_ON(nb->conns_waiting.cnt == 0);
433 cn = container_of(nb->conns_waiting.lh.next, struct conn,
434 target.out.rb.lh);
435 BUG_ON(cn->targettype != TARGET_OUT);
436 BUG_ON(cn->target.out.rb.lh.prev != &(nb->conns_waiting.lh));
437 BUG_ON((cn->target.out.rb.lh.next == &(nb->conns_waiting.lh)) &&
438 (nb->conns_waiting.lh.prev !=
439 &(cn->target.out.rb.lh)));
440 list_del(&(cn->target.out.rb.lh));
441 list_add_tail(&(cn->target.out.rb.lh),
442 &(nb->conns_waiting.lh_nextpass));
443 kref_get(&(cn->ref));
444 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
447 priority = refresh_conn_priority(cn, 0);
449 spin_lock_bh(&(cn->rcv_lock));
451 if (unlikely(cn->targettype != TARGET_OUT)) {
452 spin_unlock_bh(&(cn->rcv_lock));
453 continue;
456 maxsend = _resume_conns_maxsend(q, cn, priority);
457 maxsend += cn->target.out.maxsend_extra;
458 if (unlikely(maxsend > U32_MAX))
459 maxsend = U32_MAX;
461 rc2 = _flush_out(cn, maxsend, &sent2, 1);
463 if (rc2 == RC_FLUSH_CONN_OUT_OK ||
464 rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE) {
465 cn->target.out.maxsend_extra = 0;
466 qos_remove_conn(cn);
467 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
468 rc2 == RC_FLUSH_CONN_OUT_OOM)) {
469 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
470 if (likely(cn->target.out.rb.in_queue !=
471 RB_INQUEUE_FALSE)) {
472 list_del(&(cn->target.out.rb.lh));
473 list_add(&(cn->target.out.rb.lh),
474 &(nb->conns_waiting.lh));
476 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
477 iflags);
478 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
479 rc2 == RC_FLUSH_CONN_OUT_OOM) {
480 cn->target.out.maxsend_extra = 0;
481 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
482 if (unlikely(maxsend - sent2 > 65535))
483 cn->target.out.maxsend_extra = 65535;
484 else
485 cn->target.out.maxsend_extra = maxsend - sent2;
488 spin_unlock_bh(&(cn->rcv_lock));
490 if (sent2 != 0) {
491 *progress = 1;
492 wake_sender(cn);
495 kref_put(&(cn->ref), free_conn);
497 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
498 rc2 == RC_FLUSH_CONN_OUT_OOM) {
499 return QOS_RESUME_CONG;
504 static int resume_neighbors(struct qos_queue *q, int *sent)
506 unsigned long iflags;
508 spin_lock_irqsave(&(q->qlock), iflags);
510 while (1) {
511 struct neighbor *nb;
512 int rc;
514 if (list_empty(&(q->neighbors_waiting)) != 0) {
515 BUG_ON(q->numconns != 0);
516 spin_unlock_irqrestore(&(q->qlock), iflags);
517 return QOS_RESUME_DONE;
519 BUG_ON(q->numconns == 0);
521 nb = container_of(q->neighbors_waiting.next, struct neighbor,
522 rb.lh);
524 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
525 BUG_ON(nb->rb.lh.prev != &(q->neighbors_waiting));
526 BUG_ON((nb->rb.lh.next == &(q->neighbors_waiting)) &&
527 (q->neighbors_waiting.prev != &(nb->rb.lh)));
529 kref_get(&(nb->ref));
531 spin_unlock_irqrestore(&(q->qlock), iflags);
533 atomic_set(&(nb->cmsg_delay_conndata), 1);
535 rc = _resume_neighbors(q, nb, sent);
536 if (rc == QOS_RESUME_CONG) {
537 kref_put(&(nb->ref), neighbor_free);
538 return QOS_RESUME_CONG;
541 #warning todo remove cmsg_task, replace with call from qos_queue
542 atomic_set(&(nb->cmsg_delay_conndata), 0);
543 spin_lock_bh(&(nb->cmsg_lock));
544 schedule_controlmsg_timer(nb);
545 spin_unlock_bh(&(nb->cmsg_lock));
547 spin_lock_irqsave(&(q->qlock), iflags);
548 if (rc == QOS_RESUME_DONE) {
549 if (nb->conns_waiting.cnt == 0 &&
550 nb->rb.in_queue == RB_INQUEUE_TRUE) {
551 nb->rb.in_queue = RB_INQUEUE_FALSE;
552 list_del(&(nb->rb.lh));
553 kref_put(&(nb->ref), kreffree_bug);
555 } else if (rc == QOS_RESUME_NEXTNEIGHBOR) {
556 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
557 list_del(&(nb->rb.lh));
558 list_add_tail(&(nb->rb.lh),
559 &(q->neighbors_waiting));
561 } else {
562 BUG();
565 kref_put(&(nb->ref), neighbor_free);
567 if (rc == QOS_RESUME_NEXTNEIGHBOR) {
568 spin_unlock_irqrestore(&(q->qlock), iflags);
569 return QOS_RESUME_NEXTNEIGHBOR;
574 static int send_retrans(struct neighbor *nb, int fromqos, int *sent);
576 static int _qos_resume(struct qos_queue *q, int caller, int *sent)
578 unsigned long iflags;
579 int rc = QOS_RESUME_DONE;
580 struct list_head *lh;
582 spin_lock_irqsave(&(q->qlock), iflags);
584 if (caller == QOS_CALLER_KPACKET)
585 lh = &(q->conn_retrans_waiting);
586 else if (caller == QOS_CALLER_CONN_RETRANS)
587 lh = &(q->kpackets_waiting);
588 else if (caller == QOS_CALLER_ANNOUNCE)
589 lh = &(q->announce_waiting);
590 else
591 BUG();
593 while (list_empty(lh) == 0) {
594 struct list_head *curr = lh->next;
595 struct resume_block *rb = container_of(curr,
596 struct resume_block, lh);
597 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
598 rb->in_queue = RB_INQUEUE_FALSE;
599 list_del(curr);
601 spin_unlock_irqrestore(&(q->qlock), iflags);
602 if (caller == QOS_CALLER_KPACKET) {
603 rc = send_messages(container_of(rb, struct neighbor,
604 rb_kp), 1, sent);
605 } else if (caller == QOS_CALLER_CONN_RETRANS) {
606 rc = send_retrans(container_of(rb, struct neighbor,
607 rb_cr), 1, sent);
608 } else if (caller == QOS_CALLER_ANNOUNCE) {
609 rc = _send_announce(container_of(rb,
610 struct announce_data, rb), 1, sent);
611 } else {
612 BUG();
614 spin_lock_irqsave(&(q->qlock), iflags);
616 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
617 rb->in_queue = RB_INQUEUE_TRUE;
618 list_add(curr, lh);
619 break;
622 if (caller == QOS_CALLER_KPACKET) {
623 kref_put(&(container_of(rb, struct neighbor,
624 rb_kp)->ref), neighbor_free);
625 } else if (caller == QOS_CALLER_CONN_RETRANS) {
626 kref_put(&(container_of(rb, struct neighbor,
627 rb_cr)->ref), neighbor_free);
628 } else if (caller == QOS_CALLER_ANNOUNCE) {
629 kref_put(&(container_of(rb,
630 struct announce_data, rb)->ref),
631 announce_data_free);
632 } else {
633 BUG();
636 kref_put(&(q->ref), kreffree_bug);
639 spin_unlock_irqrestore(&(q->qlock), iflags);
641 return rc;
644 void qos_resume_taskfunc(unsigned long arg)
646 struct qos_queue *q = (struct qos_queue *) arg;
648 int rc = QOS_RESUME_DONE;
649 int sent = 0;
650 unsigned long iflags;
651 int i;
653 #warning todo limit runtime of resume task
655 spin_lock_irqsave(&(q->qlock), iflags);
657 for (i=0;i<4;i++) {
658 struct list_head *lh;
660 if (i == QOS_CALLER_KPACKET)
661 lh = &(q->conn_retrans_waiting);
662 else if (i == QOS_CALLER_CONN_RETRANS)
663 lh = &(q->kpackets_waiting);
664 else if (i == QOS_CALLER_ANNOUNCE)
665 lh = &(q->announce_waiting);
666 else if (i == QOS_CALLER_NEIGHBOR)
667 lh = &(q->neighbors_waiting);
668 else
669 BUG();
671 if (list_empty(lh))
672 continue;
674 spin_unlock_irqrestore(&(q->qlock), iflags);
675 if (i == QOS_CALLER_NEIGHBOR) {
676 rc = resume_neighbors(q, &sent);
677 } else {
678 rc = _qos_resume(q, i, &sent);
681 spin_lock_irqsave(&(q->qlock), iflags);
683 i = 0;
685 if (rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR)
686 break;
689 if (rc == QOS_RESUME_DONE) {
690 q->qos_resume_scheduled = 0;
691 } else {
692 unsigned long jiffies_tmp = jiffies;
693 unsigned long delay = (jiffies_tmp - q->jiffies_lastprogress +
694 3) / 4;
696 if (sent || unlikely(delay <= 0)) {
697 q->jiffies_lastprogress = jiffies_tmp;
698 delay = 1;
699 } else if (delay > HZ/10) {
700 q->jiffies_lastprogress = jiffies_tmp - (HZ*4)/10;
701 delay = HZ/10;
704 /* If we retry too fast here, we might starve layer 2 */
705 mod_timer(&(q->qos_resume_timer), jiffies_tmp + delay);
706 kref_get(&(q->ref));
709 qos_queue_set_congstatus(q);
711 spin_unlock_irqrestore(&(q->qlock), iflags);
714 static inline int qos_queue_is_destroyed(struct qos_queue *q_locked)
716 return q_locked->dev == 0;
719 #warning todo kref (kref_put if tasklet is scheduled)
720 void qos_resume_timerfunc(struct timer_list *qos_resume_timer)
722 unsigned long iflags;
723 struct qos_queue *q = container_of(qos_resume_timer,
724 struct qos_queue, qos_resume_timer);
725 spin_lock_irqsave(&(q->qlock), iflags);
726 if (likely(!qos_queue_is_destroyed(q)))
727 tasklet_schedule(&(q->qos_resume_task));
728 spin_unlock_irqrestore(&(q->qlock), iflags);
730 kref_put(&(q->ref), free_qos);
733 struct qos_queue *get_queue(struct net_device *dev)
735 struct qos_queue *ret = 0;
736 struct list_head *curr;
738 spin_lock_bh(&(queues_lock));
739 curr = queues.next;
740 while (curr != (&queues)) {
741 struct qos_queue *q = container_of(curr,
742 struct qos_queue, queue_list);
743 if (q->dev == dev) {
744 ret = q;
745 kref_get(&(ret->ref));
746 break;
748 curr = curr->next;
750 spin_unlock_bh(&(queues_lock));
751 return ret;
754 static void _destroy_queue(struct qos_queue *q, int caller)
756 struct list_head *lh;
758 if (caller == QOS_CALLER_KPACKET)
759 lh = &(q->conn_retrans_waiting);
760 else if (caller == QOS_CALLER_CONN_RETRANS)
761 lh = &(q->kpackets_waiting);
762 else if (caller == QOS_CALLER_ANNOUNCE)
763 lh = &(q->announce_waiting);
764 else if (caller == QOS_CALLER_NEIGHBOR)
765 lh = &(q->neighbors_waiting);
766 else
767 BUG();
769 while (list_empty(lh) == 0) {
770 struct list_head *curr = lh->next;
771 struct resume_block *rb = container_of(curr,
772 struct resume_block, lh);
773 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
774 rb->in_queue = RB_INQUEUE_FALSE;
775 list_del(curr);
777 if (caller == QOS_CALLER_KPACKET) {
778 kref_put(&(container_of(rb, struct neighbor,
779 rb_kp)->ref), neighbor_free);
780 } else if (caller == QOS_CALLER_CONN_RETRANS) {
781 kref_put(&(container_of(rb, struct neighbor,
782 rb_cr)->ref), neighbor_free);
783 } else if (caller == QOS_CALLER_ANNOUNCE) {
784 kref_put(&(container_of(rb,
785 struct announce_data, rb)->ref),
786 announce_data_free);
787 } else if (caller == QOS_CALLER_NEIGHBOR) {
788 kref_put(&(container_of(rb,
789 struct neighbor, rb)->ref),
790 neighbor_free);
791 } else {
792 BUG();
794 kref_put(&(q->ref), kreffree_bug);
798 static struct qos_queue *unlink_queue(struct net_device *dev)
800 struct qos_queue *ret = 0;
801 struct list_head *curr;
803 spin_lock_bh(&(queues_lock));
804 curr = queues.next;
805 while (curr != (&queues)) {
806 struct qos_queue *q = container_of(curr,
807 struct qos_queue, queue_list);
808 if (dev == 0 || q->dev == dev) {
809 ret = q;
810 kref_get(&(ret->ref));
812 list_del(&(q->queue_list));
813 kref_put(&(q->ref), kreffree_bug);
814 break;
816 curr = curr->next;
818 spin_unlock_bh(&(queues_lock));
819 return ret;
822 int destroy_queue(struct net_device *dev)
824 int rc = 1;
825 unsigned long iflags;
827 while (1) {
828 struct qos_queue *q = unlink_queue(dev);
830 if (q == 0)
831 break;
833 rc = 0;
835 spin_lock_irqsave(&(q->qlock), iflags);
836 if (q->dev != 0) {
837 dev_put(q->dev);
838 q->dev = 0;
840 _destroy_queue(q, QOS_CALLER_KPACKET);
841 _destroy_queue(q, QOS_CALLER_CONN_RETRANS);
842 _destroy_queue(q, QOS_CALLER_ANNOUNCE);
843 _destroy_queue(q, QOS_CALLER_NEIGHBOR);
844 spin_unlock_irqrestore(&(q->qlock), iflags);
846 tasklet_kill(&(q->qos_resume_task));
848 kref_put(&(q->ref), free_qos);
851 return rc;
854 int create_queue(struct net_device *dev)
856 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
858 if (q == 0) {
859 printk(KERN_ERR "cor: unable to allocate memory for device "
860 "queue, not enabling device");
861 return 1;
864 memset(q, 0, sizeof(struct qos_queue));
866 spin_lock_init(&(q->qlock));
868 kref_init(&(q->ref));
870 q->dev = dev;
871 dev_hold(dev);
873 timer_setup(&(q->qos_resume_timer), qos_resume_timerfunc, 0);
874 tasklet_init(&(q->qos_resume_task), qos_resume_taskfunc,
875 (unsigned long) q);
877 INIT_LIST_HEAD(&(q->kpackets_waiting));
878 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
879 INIT_LIST_HEAD(&(q->announce_waiting));
880 INIT_LIST_HEAD(&(q->neighbors_waiting));
882 atomic_set(&(q->cong_status), 0);
884 spin_lock_bh(&(queues_lock));
885 list_add(&(q->queue_list), &queues);
886 spin_unlock_bh(&(queues_lock));
888 return 0;
891 static void qos_queue_set_congstatus(struct qos_queue *q_locked)
893 __u32 newstatus;
895 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ/50)) {
896 newstatus = CONGSTATUS_NONE;
897 } else if (list_empty(&(q_locked->kpackets_waiting)) == 0) {
898 newstatus = CONGSTATUS_KPACKETS;
899 } else if (list_empty(&(q_locked->conn_retrans_waiting)) == 0) {
900 newstatus = CONGSTATUS_RETRANS;
901 } else if (list_empty(&(q_locked->announce_waiting)) == 0) {
902 newstatus = CONGSTATUS_ANNOUNCE;
903 } else if (list_empty(&(q_locked->neighbors_waiting)) == 0) {
904 newstatus = CONGSTATUS_CONNDATA;
905 } else {
906 newstatus = CONGSTATUS_NONE;
909 atomic_set(&(q_locked->cong_status), newstatus);
912 void qos_set_lastdrop(struct qos_queue *q)
914 unsigned long iflags;
916 spin_lock_irqsave(&(q->qlock), iflags);
917 q->jiffies_lastdrop = jiffies;
918 qos_queue_set_congstatus(q);
919 spin_unlock_irqrestore(&(q->qlock), iflags);
923 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
924 * caller
926 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
927 int caller, int from_nbcongwin_resume)
929 int queues_empty;
931 if (rb->in_queue == RB_INQUEUE_TRUE) {
932 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
933 return;
934 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
935 from_nbcongwin_resume == 0) {
936 return;
939 if (unlikely(qos_queue_is_destroyed(q)))
940 return;
942 queues_empty = list_empty(&(q->kpackets_waiting)) &&
943 list_empty(&(q->conn_retrans_waiting)) &&
944 list_empty(&(q->announce_waiting)) &&
945 list_empty(&(q->neighbors_waiting));
947 rb->in_queue = RB_INQUEUE_TRUE;
949 if (caller == QOS_CALLER_KPACKET) {
950 list_add(&(rb->lh) , &(q->conn_retrans_waiting));
951 kref_get(&(container_of(rb, struct neighbor, rb_kp)->ref));
952 } else if (caller == QOS_CALLER_CONN_RETRANS) {
953 list_add(&(rb->lh), &(q->kpackets_waiting));
954 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
955 } else if (caller == QOS_CALLER_ANNOUNCE) {
956 list_add(&(rb->lh), &(q->announce_waiting));
957 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
958 } else if (caller == QOS_CALLER_NEIGHBOR) {
959 struct neighbor *nb = container_of(rb, struct neighbor, rb);
960 list_add(&(rb->lh), &(q->neighbors_waiting));
961 kref_get(&(nb->ref));
962 BUG_ON(nb->conns_waiting.cnt == 0);
963 q->numconns += nb->conns_waiting.cnt;
964 q->priority_sum += nb->conns_waiting.priority_sum;
965 } else {
966 BUG();
968 kref_get(&(q->ref));
970 if (queues_empty && q->qos_resume_scheduled == 0) {
971 q->jiffies_lastprogress = jiffies;
972 q->qos_resume_scheduled = 1;
973 if (from_nbcongwin_resume) {
974 tasklet_schedule(&(q->qos_resume_task));
975 } else {
976 mod_timer(&(q->qos_resume_timer), jiffies + 1);
977 kref_get(&(q->ref));
981 qos_queue_set_congstatus(q);
984 void qos_enqueue(struct qos_queue *q, struct resume_block *rb, int caller)
986 unsigned long iflags;
988 spin_lock_irqsave(&(q->qlock), iflags);
989 _qos_enqueue(q, rb, caller, 0);
990 spin_unlock_irqrestore(&(q->qlock), iflags);
993 void qos_remove_conn(struct conn *trgt_out_lx)
995 unsigned long iflags;
996 struct neighbor *nb = trgt_out_lx->target.out.nb;
997 struct qos_queue *q = nb->queue;
998 int sched_cmsg = 0;
999 int krefput_nb = 0;
1001 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1002 BUG_ON(q == 0);
1004 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1005 if (trgt_out_lx->target.out.rb.in_queue == RB_INQUEUE_FALSE) {
1006 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1007 return;
1009 spin_lock(&(q->qlock));
1011 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_FALSE;
1012 list_del(&(trgt_out_lx->target.out.rb.lh));
1013 BUG_ON(nb->conns_waiting.cnt == 0);
1014 nb->conns_waiting.cnt--;
1015 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1016 BUG_ON(q->numconns == 0);
1017 q->numconns--;
1020 BUG_ON(nb->conns_waiting.priority_sum <
1021 trgt_out_lx->target.out.rb_priority);
1022 BUG_ON(q->priority_sum < trgt_out_lx->target.out.rb_priority);
1023 nb->conns_waiting.priority_sum -=
1024 trgt_out_lx->target.out.rb_priority;
1025 q->priority_sum -= trgt_out_lx->target.out.rb_priority;
1026 trgt_out_lx->target.out.rb_priority = 0;
1028 if (list_empty(&(nb->conns_waiting.lh)) &&
1029 list_empty(&(nb->conns_waiting.lh_nextpass))) {
1030 BUG_ON(nb->conns_waiting.priority_sum != 0);
1031 BUG_ON(nb->conns_waiting.cnt != 0);
1032 } else {
1033 BUG_ON(nb->conns_waiting.cnt == 0);
1036 if (list_empty(&(nb->conns_waiting.lh)) &&
1037 list_empty(&(nb->conns_waiting.lh_nextpass)) &&
1038 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1039 nb->rb.in_queue = RB_INQUEUE_FALSE;
1040 list_del(&(nb->rb.lh));
1041 if (atomic_read(&(nb->cmsg_delay_conndata)) != 0) {
1042 atomic_set(&(nb->cmsg_delay_conndata), 0);
1043 sched_cmsg = 1;
1046 krefput_nb = 1;
1048 BUG_ON(list_empty(&(q->neighbors_waiting)) && q->numconns != 0);
1049 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1050 q->priority_sum != 0);
1052 qos_queue_set_congstatus(q);
1055 spin_unlock(&(q->qlock));
1056 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1058 if (sched_cmsg) {
1059 spin_lock_bh(&(nb->cmsg_lock));
1060 schedule_controlmsg_timer(nb);
1061 spin_unlock_bh(&(nb->cmsg_lock));
1064 kref_put(&(trgt_out_lx->ref), kreffree_bug);
1066 if (krefput_nb)
1067 kref_put(&(nb->ref), neighbor_free);
1070 static void qos_enqueue_conn(struct conn *trgt_out_lx)
1072 unsigned long iflags;
1073 struct neighbor *nb = trgt_out_lx->target.out.nb;
1074 struct qos_queue *q;
1076 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1078 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1080 if (trgt_out_lx->target.out.rb.in_queue != RB_INQUEUE_FALSE)
1081 goto out;
1083 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_TRUE;
1084 list_add(&(trgt_out_lx->target.out.rb.lh), &(nb->conns_waiting.lh));
1085 kref_get(&(trgt_out_lx->ref));
1086 nb->conns_waiting.cnt++;
1088 q = trgt_out_lx->target.out.nb->queue;
1089 spin_lock(&(q->qlock));
1090 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1091 q->numconns++;
1092 } else {
1093 _qos_enqueue(q, &(nb->rb), QOS_CALLER_NEIGHBOR, 0);
1095 spin_unlock(&(q->qlock));
1097 out:
1098 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1101 static struct sk_buff *create_packet(struct neighbor *nb, int size,
1102 gfp_t alloc_flags)
1104 struct sk_buff *ret;
1106 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1107 nb->dev->needed_tailroom, alloc_flags);
1108 if (unlikely(ret == 0))
1109 return 0;
1111 ret->protocol = htons(ETH_P_COR);
1112 ret->dev = nb->dev;
1114 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1115 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1116 nb->dev->dev_addr, ret->len) < 0))
1117 return 0;
1118 skb_reset_network_header(ret);
1120 return ret;
1123 struct sk_buff *create_packet_cmsg(struct neighbor *nb, int size,
1124 gfp_t alloc_flags, __u64 seqno)
1126 struct sk_buff *ret;
1127 char *dest;
1129 ret = create_packet(nb, size + 7, alloc_flags);
1130 if (unlikely(ret == 0))
1131 return 0;
1133 dest = skb_put(ret, 7);
1134 BUG_ON(dest == 0);
1136 dest[0] = PACKET_TYPE_CMSG;
1137 dest += 1;
1139 put_u48(dest, seqno);
1140 dest += 6;
1142 return ret;
1145 struct sk_buff *create_packet_conndata(struct neighbor *nb, int size,
1146 gfp_t alloc_flags, __u32 conn_id, __u64 seqno,
1147 __u8 snd_delayed_lowbuf, __u8 flush)
1149 struct sk_buff *ret;
1150 char *dest;
1152 ret = create_packet(nb, size + 11, alloc_flags);
1153 if (unlikely(ret == 0))
1154 return 0;
1156 dest = skb_put(ret, 11);
1157 BUG_ON(dest == 0);
1159 if (flush != 0) {
1160 if (snd_delayed_lowbuf != 0) {
1161 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH;
1162 } else {
1163 dest[0] = PACKET_TYPE_CONNDATA_FLUSH;
1165 } else {
1166 if (snd_delayed_lowbuf != 0) {
1167 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED;
1168 } else {
1169 dest[0] = PACKET_TYPE_CONNDATA;
1172 dest += 1;
1174 put_u32(dest, conn_id);
1175 dest += 4;
1176 put_u48(dest, seqno);
1177 dest += 6;
1179 return ret;
1183 * warning: all callers must do the following calls in this order:
1184 * kref_get
1185 * spin_lock
1186 * reschedule_conn_retrans_timer
1187 * spin_unlock
1188 * kref put
1190 * This is because this function calls kref_put
1192 void reschedule_conn_retrans_timer(struct neighbor *nb_retransconnlocked)
1194 struct conn_retrans *cr = 0;
1196 if (list_empty(&(nb_retransconnlocked->retrans_conn_list)))
1197 return;
1199 if (nb_retransconnlocked->retrans_conn_running != 0)
1200 return;
1202 if (nb_retransconnlocked->retrans_conn_timer_running == 0) {
1203 nb_retransconnlocked->retrans_conn_timer_running = 1;
1204 kref_get(&(nb_retransconnlocked->ref));
1207 cr = container_of(nb_retransconnlocked->retrans_conn_list.next,
1208 struct conn_retrans, timeout_list);
1210 mod_timer(&(nb_retransconnlocked->retrans_conn_timer), cr->timeout);
1214 * warning:
1215 * caller must also call kref_get/put, see reschedule_conn_retrans_timer
1217 static void cancel_conn_retrans(struct neighbor *nb_retransconnlocked,
1218 struct conn *trgt_out_lx, struct conn_retrans *cr,
1219 __u64 *bytes_acked)
1221 if (unlikely(cr->state == CONN_RETRANS_ACKED))
1222 return;
1224 if (cr->state == CONN_RETRANS_SCHEDULED) {
1225 list_del(&(cr->timeout_list));
1226 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
1227 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
1228 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
1229 trgt_out_lx->target.out.retrans_lowwindow--;
1232 if (cr->state != CONN_RETRANS_INITIAL)
1233 *bytes_acked += cr->length;
1235 list_del(&(cr->conn_list));
1236 cr->state = CONN_RETRANS_ACKED;
1238 kref_put(&(cr->ref), free_connretrans);
1242 * nb->retrans_lock must be held when calling this
1243 * (see schedule_retransmit_conn())
1245 static void cancel_acked_conn_retrans(struct conn *trgt_out_l,
1246 __u64 *bytes_acked)
1248 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
1250 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
1251 struct conn_retrans *cr = container_of(
1252 trgt_out_l->target.out.retrans_list.next,
1253 struct conn_retrans, conn_list);
1255 if (seqno_after(cr->seqno + cr->length, seqno_acked)) {
1256 if (seqno_before(cr->seqno, seqno_acked)) {
1257 *bytes_acked += seqno_clean(seqno_acked -
1258 cr->seqno);
1259 cr->length -= seqno_clean(seqno_acked -
1260 cr->seqno);
1261 cr->seqno = seqno_acked;
1263 break;
1266 cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l, cr,
1267 bytes_acked);
1270 reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
1273 void cancel_all_conn_retrans(struct conn *trgt_out_lx)
1275 struct neighbor *nb = trgt_out_lx->target.out.nb;
1276 __u64 bytes_acked = 0;
1278 spin_lock_bh(&(nb->retrans_conn_lock));
1280 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
1281 struct conn_retrans *cr = container_of(
1282 trgt_out_lx->target.out.retrans_list.next,
1283 struct conn_retrans, conn_list);
1284 BUG_ON(cr->trgt_out_o != trgt_out_lx);
1286 cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
1289 reschedule_conn_retrans_timer(nb);
1291 spin_unlock_bh(&(nb->retrans_conn_lock));
1293 if (bytes_acked > 0)
1294 nbcongwin_data_acked(nb, bytes_acked);
1297 static void cancel_all_conn_retrans_nb(struct neighbor *nb)
1299 __u64 bytes_acked = 0;
1301 while (1) {
1302 struct conn_retrans *cr;
1304 spin_lock_bh(&(nb->retrans_conn_lock));
1306 if (list_empty(&(nb->retrans_conn_list))) {
1307 spin_unlock_bh(&(nb->retrans_conn_lock));
1308 break;
1311 cr = container_of(nb->retrans_conn_list.next,
1312 struct conn_retrans, timeout_list);
1314 kref_get(&(cr->ref));
1316 spin_unlock_bh(&(nb->retrans_conn_lock));
1319 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
1320 spin_lock_bh(&(nb->retrans_conn_lock));
1322 if (likely(cr == container_of(nb->retrans_conn_list.next,
1323 struct conn_retrans, timeout_list)))
1324 cancel_conn_retrans(nb, cr->trgt_out_o, cr,
1325 &bytes_acked);
1327 spin_unlock_bh(&(nb->retrans_conn_lock));
1328 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
1330 kref_put(&(cr->ref), free_connretrans);
1333 if (bytes_acked > 0)
1334 nbcongwin_data_acked(nb, bytes_acked);
1337 static struct conn_retrans *prepare_conn_retrans(struct conn *trgt_out_l,
1338 __u64 seqno, __u32 len, __u8 snd_delayed_lowbuf,
1339 struct conn_retrans *cr_splitted, int retransconnlocked)
1341 struct neighbor *nb = trgt_out_l->target.out.nb;
1343 struct conn_retrans *cr = kmem_cache_alloc(connretrans_slab,
1344 GFP_ATOMIC);
1346 if (unlikely(cr == 0))
1347 return 0;
1349 BUG_ON(trgt_out_l->isreset != 0);
1351 memset(cr, 0, sizeof (struct conn_retrans));
1352 cr->trgt_out_o = trgt_out_l;
1353 kref_get(&(trgt_out_l->ref));
1354 cr->seqno = seqno;
1355 cr->length = len;
1356 cr->snd_delayed_lowbuf = snd_delayed_lowbuf;
1357 kref_init(&(cr->ref));
1359 kref_get(&(cr->ref));
1360 if (retransconnlocked == 0)
1361 spin_lock_bh(&(nb->retrans_conn_lock));
1363 if (cr_splitted != 0)
1364 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
1365 else
1366 list_add_tail(&(cr->conn_list),
1367 &(cr->trgt_out_o->target.out.retrans_list));
1369 if (retransconnlocked == 0)
1370 spin_unlock_bh(&(nb->retrans_conn_lock));
1372 return cr;
1375 #define RC_SENDRETRANS_OK 0
1376 #define RC_SENDRETRANS_OOM 1
1377 #define RC_SENDRETRANS_QUEUEFULL 2
1378 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
1380 static int __send_retrans(struct neighbor *nb, struct conn *trgt_out_l,
1381 struct conn_retrans *cr, __u64 *bytes_sent)
1383 __u8 flush = 0;
1385 BUG_ON(cr->length == 0);
1387 if (trgt_out_l->flush != 0 && seqno_eq(cr->seqno + cr->length,
1388 trgt_out_l->target.out.seqno_nextsend) &&
1389 trgt_out_l->data_buf.read_remaining == 0)
1390 flush = 1;
1392 if (send_conndata_as_skb(nb, cr->length)) {
1393 struct sk_buff *skb;
1394 char *dst;
1395 int rc;
1397 skb = create_packet_conndata(nb, cr->length, GFP_ATOMIC,
1398 trgt_out_l->target.out.conn_id, cr->seqno,
1399 cr->snd_delayed_lowbuf, flush);
1400 if (unlikely(skb == 0))
1401 return RC_SENDRETRANS_OOM;
1403 dst = skb_put(skb, cr->length);
1405 databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
1407 rc = cor_dev_queue_xmit(skb, nb->queue,
1408 QOS_CALLER_CONN_RETRANS);
1409 if (rc == NET_XMIT_DROP)
1410 return RC_SENDRETRANS_QUEUEFULLDROPPED;
1411 schedule_retransmit_conn(cr, 1, 0);
1412 if (rc != NET_XMIT_SUCCESS)
1413 return RC_SENDRETRANS_QUEUEFULL;
1415 } else {
1416 struct control_msg_out *cm;
1417 char *buf;
1419 buf = kmalloc(cr->length, GFP_ATOMIC);
1420 if (unlikely(buf == 0))
1421 return RC_SENDRETRANS_OOM;
1423 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
1424 if (unlikely(cm == 0)) {
1425 kfree(buf);
1426 return RC_SENDRETRANS_OOM;
1429 databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
1431 send_conndata(cm, trgt_out_l->target.out.conn_id,
1432 cr->seqno, buf, buf, cr->length,
1433 cr->snd_delayed_lowbuf, flush,
1434 trgt_out_l->is_highlatency, cr);
1437 *bytes_sent += cr->length;
1439 return RC_SENDRETRANS_OK;
1442 static int _send_retrans_splitcr_ifneeded(struct neighbor *nb_retransconnlocked,
1443 struct conn *trgt_out_l, struct conn_retrans *cr)
1445 __u32 targetmss = mss_conndata(nb_retransconnlocked);
1446 __u64 windowlimit = seqno_clean(
1447 trgt_out_l->target.out.seqno_windowlimit -
1448 cr->seqno);
1449 __u32 maxsize = targetmss;
1450 if (windowlimit < maxsize)
1451 maxsize = windowlimit;
1453 if (unlikely(cr->length > maxsize)) {
1454 struct conn_retrans *cr2 = prepare_conn_retrans(trgt_out_l,
1455 cr->seqno + maxsize, cr->length - maxsize,
1456 cr->snd_delayed_lowbuf, cr, 1);
1457 if (unlikely(cr2 == 0))
1458 return RC_SENDRETRANS_OOM;
1460 cr2->timeout = cr->timeout;
1462 list_add(&(cr2->timeout_list),
1463 &(nb_retransconnlocked->retrans_conn_list));
1464 cr2->state = CONN_RETRANS_SCHEDULED;
1466 cr->length = maxsize;
1469 return RC_SENDRETRANS_OK;
1472 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr,
1473 __u64 *bytes_sent)
1476 struct conn *trgt_out_o = cr->trgt_out_o;
1477 int rc = RC_SENDRETRANS_OK;
1479 spin_lock_bh(&(trgt_out_o->rcv_lock));
1481 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1482 BUG_ON(trgt_out_o->target.out.nb != nb);
1484 spin_lock_bh(&(nb->retrans_conn_lock));
1485 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1486 spin_unlock_bh(&(nb->retrans_conn_lock));
1487 goto out;
1490 BUG_ON(trgt_out_o->isreset != 0);
1492 BUG_ON(seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
1494 if (seqno_after_eq(cr->seqno,
1495 trgt_out_o->target.out.seqno_windowlimit)) {
1496 BUG_ON(cr->state != CONN_RETRANS_SENDING);
1497 cr->state = CONN_RETRANS_LOWWINDOW;
1498 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1499 trgt_out_o->target.out.retrans_lowwindow++;
1501 spin_unlock_bh(&(nb->retrans_conn_lock));
1502 goto out;
1505 rc = _send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
1507 spin_unlock_bh(&(nb->retrans_conn_lock));
1509 kref_get(&(trgt_out_o->ref));
1511 if (rc == RC_SENDRETRANS_OK)
1512 rc = __send_retrans(nb, trgt_out_o, cr, bytes_sent);
1514 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
1515 spin_lock_bh(&(nb->retrans_conn_lock));
1516 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1517 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
1518 if (rc == RC_SENDRETRANS_OOM)
1519 cr->timeout = jiffies + 1;
1520 list_add(&(cr->timeout_list), &(nb->retrans_conn_list));
1521 cr->state = CONN_RETRANS_SCHEDULED;
1522 } else {
1523 BUG();
1525 spin_unlock_bh(&(nb->retrans_conn_lock));
1528 out:
1529 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1531 kref_put(&(trgt_out_o->ref), free_conn);
1533 return (rc == RC_SENDRETRANS_OOM ||
1534 rc == RC_SENDRETRANS_QUEUEFULL ||
1535 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
1538 static int send_retrans(struct neighbor *nb, int fromqos, int *sent)
1540 int queuefull = 0;
1541 int nbstate = get_neigh_state(nb);
1542 __u64 bytes_sent = 0;
1543 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
1544 goto out;
1545 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1547 * cancel_all_conn_retrans_nb should not be needed, because
1548 * reset_all_conns calls cancel_all_conn_retrans
1550 cancel_all_conn_retrans_nb(nb);
1551 return QOS_RESUME_DONE;
1554 while (1) {
1555 struct conn_retrans *cr = 0;
1557 if (qos_fastsend_allowed_conn_retrans(nb) == 0)
1558 goto qos_enqueue;
1560 spin_lock_bh(&(nb->retrans_conn_lock));
1562 if (list_empty(&(nb->retrans_conn_list)))
1563 break;
1565 cr = container_of(nb->retrans_conn_list.next,
1566 struct conn_retrans, timeout_list);
1568 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1570 if (time_after(cr->timeout, jiffies))
1571 break;
1573 kref_get(&(cr->ref));
1574 list_del(&(cr->timeout_list));
1575 cr->state = CONN_RETRANS_SENDING;
1577 spin_unlock_bh(&(nb->retrans_conn_lock));
1578 queuefull = _send_retrans(nb, cr, &bytes_sent);
1579 kref_put(&(cr->ref), free_connretrans);
1580 if (queuefull) {
1581 qos_enqueue:
1582 if (fromqos == 0)
1583 qos_enqueue(nb->queue, &(nb->rb_cr),
1584 QOS_CALLER_CONN_RETRANS);
1585 goto out;
1586 } else {
1587 *sent = 1;
1591 if (0) {
1592 out:
1593 spin_lock_bh(&(nb->retrans_conn_lock));
1596 if (queuefull == 0) {
1597 nb->retrans_conn_running = 0;
1598 reschedule_conn_retrans_timer(nb);
1601 spin_unlock_bh(&(nb->retrans_conn_lock));
1603 if (bytes_sent > 0)
1604 nbcongwin_data_retransmitted(nb, bytes_sent);
1606 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
1609 void retransmit_conn_taskfunc(unsigned long arg)
1611 struct neighbor *nb = (struct neighbor *) arg;
1612 int sent = 0;
1613 send_retrans(nb, 0, &sent);
1614 kref_put(&(nb->ref), neighbor_free);
1617 void retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
1619 struct neighbor *nb = container_of(retrans_conn_timer,
1620 struct neighbor, retrans_conn_timer);
1622 spin_lock_bh(&(nb->retrans_conn_lock));
1624 BUG_ON(nb->retrans_conn_timer_running == 0);
1625 BUG_ON(nb->retrans_conn_running == 1);
1627 nb->retrans_conn_timer_running = 0;
1628 nb->retrans_conn_running = 1;
1630 spin_unlock_bh(&(nb->retrans_conn_lock));
1632 tasklet_schedule(&(nb->retrans_conn_task));
1635 static void conn_ack_ooo_rcvd_splitcr(struct conn *trgt_out_l,
1636 struct conn_retrans *cr, __u64 seqno_ooo, __u32 length,
1637 __u64 *bytes_acked)
1639 struct conn_retrans *cr2;
1640 __u64 seqno_cr2start;
1641 __u32 oldcrlenght = cr->length;
1643 if (cr->state != CONN_RETRANS_SCHEDULED &&
1644 cr->state != CONN_RETRANS_LOWWINDOW)
1645 return;
1647 seqno_cr2start = seqno_ooo+length;
1648 cr2 = prepare_conn_retrans(trgt_out_l, seqno_cr2start,
1649 seqno_clean(cr->seqno + cr->length - seqno_cr2start),
1650 cr->snd_delayed_lowbuf, cr, 1);
1652 if (unlikely(cr2 == 0))
1653 return;
1655 BUG_ON(cr2->length > cr->length);
1657 cr2->timeout = cr->timeout;
1658 cr2->state = cr->state;
1660 if (cr->state != CONN_RETRANS_SCHEDULED)
1661 list_add(&(cr2->timeout_list), &(cr->timeout_list));
1663 BUG_ON(seqno_clean(seqno_ooo - cr->seqno) > cr->length);
1665 cr->length -= seqno_clean(seqno_ooo - cr->seqno);
1666 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
1668 *bytes_acked += length;
1671 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
1672 struct conn *trgt_out, __u64 seqno_ooo, __u32 length,
1673 __u64 *bytes_acked)
1675 struct list_head *curr;
1677 if (unlikely(length == 0))
1678 return;
1680 spin_lock_bh(&(trgt_out->rcv_lock));
1682 if (unlikely(trgt_out->targettype != TARGET_OUT))
1683 goto out;
1684 if (unlikely(trgt_out->target.out.nb != nb))
1685 goto out;
1686 if (unlikely(trgt_out->target.out.conn_id != conn_id))
1687 goto out;
1689 kref_get(&(nb->ref));
1690 spin_lock_bh(&(nb->retrans_conn_lock));
1692 curr = trgt_out->target.out.retrans_list.next;
1694 while (curr != &(trgt_out->target.out.retrans_list)) {
1695 struct conn_retrans *cr = container_of(curr,
1696 struct conn_retrans, conn_list);
1698 int ack_covers_start = seqno_after_eq(cr->seqno, seqno_ooo);
1699 int ack_covers_end = seqno_before_eq(cr->seqno + cr->length,
1700 seqno_ooo + length);
1702 curr = curr->next;
1704 if (seqno_before(cr->seqno + cr->length, seqno_ooo))
1705 continue;
1707 if (seqno_after(cr->seqno, seqno_ooo + length))
1708 break;
1710 if (likely(ack_covers_start && ack_covers_end)) {
1711 cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
1712 reschedule_conn_retrans_timer(nb);
1713 } else if (ack_covers_start) {
1714 __u32 diff = seqno_ooo + length - cr->seqno -
1715 cr->length;
1716 BUG_ON(diff >= cr->length);
1717 cr->seqno += diff;
1718 cr->length -= diff;
1719 *bytes_acked =+ diff;
1720 } else if (ack_covers_end) {
1721 __u32 diff = seqno_ooo + length - cr->seqno;
1722 BUG_ON(diff >= length);
1723 cr->length -= diff;
1724 *bytes_acked += diff;
1725 } else {
1726 conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
1727 length, bytes_acked);
1728 break;
1732 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
1733 trgt_out->target.out.seqno_acked =
1734 trgt_out->target.out.seqno_nextsend;
1735 } else {
1736 struct conn_retrans *cr = container_of(
1737 trgt_out->target.out.retrans_list.next,
1738 struct conn_retrans, conn_list);
1739 if (seqno_after(cr->seqno, trgt_out->target.out.seqno_acked))
1740 trgt_out->target.out.seqno_acked = cr->seqno;
1743 spin_unlock_bh(&(nb->retrans_conn_lock));
1744 kref_put(&(nb->ref), neighbor_free);
1746 out:
1747 spin_unlock_bh(&(trgt_out->rcv_lock));
1750 static void _conn_ack_rcvd_nosendwin(struct conn *trgt_out_l)
1752 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
1753 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
1754 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
1756 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
1757 trgt_out_l->bufsize.act.noact.bytesleft = max(
1758 trgt_out_l->bufsize.act.noact.bytesleft,
1759 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1761 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
1762 trgt_out_l->bufsize.ignore_rcv_lowbuf,
1763 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1767 * nb->retrans_lock must be held when calling this
1768 * (see schedule_retransmit_conn())
1770 static void reschedule_lowwindow_retrans(struct conn *trgt_out_l)
1772 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
1773 int cnt = 0;
1775 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
1776 struct conn_retrans *cr;
1778 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
1779 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
1780 65535);
1781 trgt_out_l->target.out.retrans_lowwindow = 0;
1782 break;
1785 cr = container_of(lh, struct conn_retrans, conn_list);
1787 if (seqno_after_eq(cr->seqno,
1788 trgt_out_l->target.out.seqno_windowlimit)) {
1789 break;
1792 if (cr->state == CONN_RETRANS_LOWWINDOW)
1793 schedule_retransmit_conn(cr, 1, 1);
1795 lh = lh->next;
1796 cnt++;
1800 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
1801 __u64 seqno, int setwindow, __u8 window, __u64 *bytes_acked)
1803 int seqno_advanced = 0;
1804 int window_enlarged = 0;
1806 spin_lock_bh(&(trgt_out->rcv_lock));
1808 if (unlikely(trgt_out->isreset != 0))
1809 goto out;
1810 if (unlikely(trgt_out->targettype != TARGET_OUT))
1811 goto out;
1812 if (unlikely(trgt_out->target.out.nb != nb))
1813 goto out;
1814 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
1815 goto out;
1817 if (unlikely(seqno_after(seqno, trgt_out->target.out.seqno_nextsend) ||
1818 seqno_before(seqno, trgt_out->target.out.seqno_acked)))
1819 goto out;
1821 if (setwindow) {
1822 __u64 windowdec = dec_log_64_7(window);
1823 if (likely(seqno_after(seqno,
1824 trgt_out->target.out.seqno_acked)) ||
1825 seqno_after(seqno + windowdec,
1826 trgt_out->target.out.seqno_windowlimit)) {
1827 trgt_out->target.out.seqno_windowlimit = seqno +
1828 windowdec;
1829 window_enlarged = 1;
1833 if (seqno_after(seqno, trgt_out->target.out.seqno_acked))
1834 seqno_advanced = 1;
1836 if (seqno_advanced == 0 && window_enlarged == 0)
1837 goto out;
1839 kref_get(&(nb->ref));
1840 spin_lock_bh(&(nb->retrans_conn_lock));
1842 if (seqno_advanced) {
1843 trgt_out->target.out.seqno_acked = seqno;
1844 cancel_acked_conn_retrans(trgt_out, bytes_acked);
1847 if (window_enlarged)
1848 reschedule_lowwindow_retrans(trgt_out);
1850 spin_unlock_bh(&(nb->retrans_conn_lock));
1851 kref_put(&(nb->ref), neighbor_free);
1853 if (seqno_advanced)
1854 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
1856 if (seqno_eq(trgt_out->target.out.seqno_acked,
1857 trgt_out->target.out.seqno_nextsend))
1858 _conn_ack_rcvd_nosendwin(trgt_out);
1860 out:
1861 if (seqno_advanced || window_enlarged)
1862 flush_buf(trgt_out);
1864 spin_unlock_bh(&(trgt_out->rcv_lock));
1866 wake_sender(trgt_out);
1869 static void try_combine_conn_retrans_prev(struct neighbor *nb_retransconnlocked,
1870 struct conn *trgt_out_lx, struct conn_retrans *cr)
1872 struct conn_retrans *cr_prev;
1873 __u64 bytes_dummyacked = 0;
1875 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1877 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
1878 return;
1880 cr_prev = container_of(cr->conn_list.prev, struct conn_retrans,
1881 conn_list);
1883 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
1884 return;
1885 if (cr_prev->timeout != cr->timeout)
1886 return;
1887 if (!seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
1888 return;
1890 cr->seqno -= cr_prev->length;
1891 cr->length += cr_prev->length;
1893 cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
1894 &bytes_dummyacked);
1897 static void try_combine_conn_retrans_next(struct neighbor *nb_retranslocked,
1898 struct conn *trgt_out_lx, struct conn_retrans *cr)
1900 struct conn_retrans *cr_next;
1901 __u64 bytes_dummyacked = 0;
1903 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1905 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
1906 return;
1908 cr_next = container_of(cr->conn_list.next, struct conn_retrans,
1909 conn_list);
1911 if (cr_next->state != CONN_RETRANS_SCHEDULED)
1912 return;
1913 if (cr_next->timeout != cr->timeout)
1914 return;
1915 if (!seqno_eq(cr->seqno + cr->length, cr_next->seqno))
1916 return;
1918 cr->length += cr_next->length;
1920 cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
1921 &bytes_dummyacked);
1924 void schedule_retransmit_conn(struct conn_retrans *cr, int connlocked,
1925 int nbretransconn_locked)
1927 struct conn *trgt_out_o = cr->trgt_out_o;
1928 struct neighbor *nb;
1929 int first;
1931 if (connlocked == 0)
1932 spin_lock_bh(&(trgt_out_o->rcv_lock));
1934 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1935 nb = trgt_out_o->target.out.nb;
1937 cr->timeout = calc_timeout(atomic_read(&(nb->latency_retrans_us)),
1938 atomic_read(&(nb->latency_stddev_retrans_us)),
1939 atomic_read(&(nb->max_remote_ackconn_delay_us)));
1941 if (nbretransconn_locked == 0)
1942 spin_lock_bh(&(nb->retrans_conn_lock));
1944 kref_get(&(nb->ref));
1946 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
1948 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1949 goto out;
1950 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
1951 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
1952 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1953 trgt_out_o->target.out.retrans_lowwindow--;
1956 first = unlikely(list_empty(&(nb->retrans_conn_list)));
1957 list_add_tail(&(cr->timeout_list), &(nb->retrans_conn_list));
1958 cr->state = CONN_RETRANS_SCHEDULED;
1960 if (unlikely(first)) {
1961 reschedule_conn_retrans_timer(nb);
1962 } else {
1963 try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
1964 try_combine_conn_retrans_next(nb, trgt_out_o, cr);
1967 out:
1968 if (nbretransconn_locked == 0)
1969 spin_unlock_bh(&(nb->retrans_conn_lock));
1971 kref_put(&(nb->ref), neighbor_free);
1973 if (connlocked == 0)
1974 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1977 static int _flush_out_skb(struct conn *trgt_out_lx, __u32 len,
1978 __u8 snd_delayed_lowbuf)
1980 struct neighbor *nb = trgt_out_lx->target.out.nb;
1982 __u64 seqno;
1983 struct conn_retrans *cr;
1984 struct sk_buff *skb;
1985 char *dst;
1986 __u8 flush;
1987 int rc;
1989 if (trgt_out_lx->flush != 0 &&
1990 trgt_out_lx->data_buf.read_remaining == len)
1991 flush = 1;
1993 seqno = trgt_out_lx->target.out.seqno_nextsend;
1994 skb = create_packet_conndata(trgt_out_lx->target.out.nb, len,
1995 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
1996 snd_delayed_lowbuf, flush);
1997 if (unlikely(skb == 0))
1998 return RC_FLUSH_CONN_OUT_OOM;
2000 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2001 0, 0);
2002 if (unlikely(cr == 0)) {
2003 kfree_skb(skb);
2004 return RC_FLUSH_CONN_OUT_OOM;
2007 dst = skb_put(skb, len);
2009 databuf_pull(trgt_out_lx, dst, len);
2011 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
2012 if (rc == NET_XMIT_DROP) {
2013 databuf_unpull(trgt_out_lx, len);
2014 spin_lock_bh(&(nb->retrans_conn_lock));
2015 cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
2016 spin_unlock_bh(&(nb->retrans_conn_lock));
2017 kref_put(&(cr->ref), free_connretrans);
2018 return RC_FLUSH_CONN_OUT_CONG;
2021 trgt_out_lx->target.out.seqno_nextsend += len;
2022 nbcongwin_data_sent(nb, len);
2023 schedule_retransmit_conn(cr, 1, 0);
2024 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2025 update_src_sock_sndspeed(trgt_out_lx, len);
2027 kref_put(&(cr->ref), free_connretrans);
2029 return (rc == NET_XMIT_SUCCESS) ?
2030 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
2033 static int _flush_out_conndata(struct conn *trgt_out_lx, __u32 len,
2034 __u8 snd_delayed_lowbuf)
2036 __u64 seqno;
2037 struct control_msg_out *cm;
2038 struct conn_retrans *cr;
2039 char *buf;
2040 __u8 flush = 0;
2042 if (trgt_out_lx->flush != 0 &&
2043 trgt_out_lx->data_buf.read_remaining == len)
2044 flush = 1;
2046 buf = kmalloc(len, GFP_ATOMIC);
2048 if (unlikely(buf == 0))
2049 return RC_FLUSH_CONN_OUT_OOM;
2051 cm = alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
2052 if (unlikely(cm == 0)) {
2053 kfree(buf);
2054 return RC_FLUSH_CONN_OUT_OOM;
2057 seqno = trgt_out_lx->target.out.seqno_nextsend;
2059 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2060 0, 0);
2061 if (unlikely(cr == 0)) {
2062 kfree(buf);
2063 free_control_msg(cm);
2064 return RC_FLUSH_CONN_OUT_OOM;
2067 databuf_pull(trgt_out_lx, buf, len);
2068 trgt_out_lx->target.out.seqno_nextsend += len;
2069 nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
2070 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2071 update_src_sock_sndspeed(trgt_out_lx, len);
2073 send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf, len,
2074 snd_delayed_lowbuf, flush, trgt_out_lx->is_highlatency,
2075 cr);
2077 return RC_FLUSH_CONN_OUT_OK;
2080 int srcin_buflimit_reached(struct conn *src_in_lx)
2082 __u64 window_left;
2084 if (unlikely(seqno_before(src_in_lx->source.in.window_seqnolimit,
2085 src_in_lx->source.in.next_seqno)))
2086 return 1;
2088 window_left = seqno_clean(src_in_lx->source.in.window_seqnolimit -
2089 src_in_lx->source.in.next_seqno);
2091 if (window_left < WINDOW_ENCODE_MIN)
2092 return 1;
2094 if (window_left/2 < src_in_lx->data_buf.read_remaining)
2095 return 1;
2097 return 0;
2100 static __u32 maxsend_left_to_len(__u32 maxsend_left)
2102 __u32 i;
2103 if (maxsend_left < 128)
2104 return maxsend_left;
2106 for (i=128;i<4096;) {
2107 if (i*2 > maxsend_left)
2108 return i;
2109 i = i*2;
2112 return maxsend_left - maxsend_left%4096;
2115 static int seqno_low_sendlimit(struct conn *trgt_out_lx, __u64 windowlimit,
2116 __u32 sndlen)
2118 __u64 bytes_ackpending;
2120 BUG_ON(seqno_before(trgt_out_lx->target.out.seqno_nextsend,
2121 trgt_out_lx->target.out.seqno_acked));
2123 bytes_ackpending = seqno_clean(trgt_out_lx->target.out.seqno_nextsend -
2124 trgt_out_lx->target.out.seqno_acked);
2126 if (windowlimit <= sndlen)
2127 return 1;
2129 if (unlikely(bytes_ackpending + sndlen < bytes_ackpending))
2130 return 0;
2132 if (trgt_out_lx->is_highlatency != 0)
2133 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 4)
2134 ? 1 : 0;
2135 else
2136 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 8)
2137 ? 1 : 0;
2140 static void _flush_out_ignore_lowbuf(struct conn *trgt_out_lx)
2142 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
2143 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
2144 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
2147 static __u64 get_windowlimit(struct conn *trgt_out_lx)
2149 if (unlikely(seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
2150 trgt_out_lx->target.out.seqno_nextsend)))
2151 return 0;
2153 return seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
2154 trgt_out_lx->target.out.seqno_nextsend);
2157 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
2158 int from_qos)
2160 struct neighbor *nb = trgt_out_lx->target.out.nb;
2162 __u32 targetmss;
2164 int nbstate;
2166 __u8 snd_delayed_lowbuf = trgt_out_lx->target.out.windowlimit_reached;
2168 __u32 maxsend_left = maxsend;
2170 trgt_out_lx->target.out.windowlimit_reached = 0;
2172 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
2174 if (unlikely(trgt_out_lx->target.out.established == 0))
2175 return RC_FLUSH_CONN_OUT_OK;
2177 if (unlikely(trgt_out_lx->isreset != 0))
2178 return RC_FLUSH_CONN_OUT_OK;
2180 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
2182 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
2183 return RC_FLUSH_CONN_OUT_OK;
2185 #warning todo burst queue
2186 if (from_qos == 0 && qos_fastsend_allowed_conn(trgt_out_lx) == 0)
2187 return RC_FLUSH_CONN_OUT_CONG;
2189 spin_lock_bh(&(nb->stalledconn_lock));
2190 nbstate = get_neigh_state(nb);
2191 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
2192 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev == 0 &&
2193 trgt_out_lx->target.out.nbstalled_lh.next != 0);
2194 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev != 0 &&
2195 trgt_out_lx->target.out.nbstalled_lh.next == 0);
2197 if (trgt_out_lx->target.out.nbstalled_lh.prev == 0) {
2198 kref_get(&(trgt_out_lx->ref));
2199 list_add_tail(&(trgt_out_lx->target.out.nbstalled_lh),
2200 &(nb->stalledconn_list));
2203 spin_unlock_bh(&(nb->stalledconn_lock));
2205 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
2206 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
2208 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
2209 get_windowlimit(trgt_out_l),
2210 trgt_out_l->data_buf.read_remaining); */
2212 targetmss = mss_conndata(nb);
2214 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
2215 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2216 int rc;
2218 if (maxsend_left < targetmss)
2219 break;
2221 if (windowlimit < targetmss) {
2222 trgt_out_lx->target.out.windowlimit_reached = 1;
2223 snd_delayed_lowbuf = 1;
2224 _flush_out_ignore_lowbuf(trgt_out_lx);
2225 break;
2228 if (nbcongwin_send_allowed(nb) == 0)
2229 return RC_FLUSH_CONN_OUT_CONG;
2231 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, targetmss)) {
2232 trgt_out_lx->target.out.windowlimit_reached = 1;
2233 snd_delayed_lowbuf = 1;
2236 if (likely(send_conndata_as_skb(nb, targetmss)))
2237 rc = _flush_out_skb(trgt_out_lx, targetmss,
2238 snd_delayed_lowbuf);
2239 else
2240 rc = _flush_out_conndata(trgt_out_lx, targetmss,
2241 snd_delayed_lowbuf);
2243 if (rc == RC_FLUSH_CONN_OUT_OK ||
2244 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2245 maxsend_left -= targetmss;
2246 *sent += targetmss;
2249 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2250 return RC_FLUSH_CONN_OUT_CONG;
2251 if (rc != RC_FLUSH_CONN_OUT_OK)
2252 return rc;
2255 if (trgt_out_lx->data_buf.read_remaining > 0) {
2256 __u32 len = trgt_out_lx->data_buf.read_remaining;
2257 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2258 int rc;
2260 if (maxsend_left < len) {
2261 if (maxsend_left == maxsend && maxsend_left >= 128 &&
2262 trgt_out_lx->is_highlatency == 0) {
2263 len = maxsend_left_to_len(maxsend_left);
2264 } else {
2265 return RC_FLUSH_CONN_OUT_MAXSENT;
2269 if (trgt_out_lx->flush == 0 &&
2270 trgt_out_lx->sourcetype == SOURCE_SOCK &&
2271 cor_sock_sndbufavailable(trgt_out_lx) != 0)
2272 goto out;
2274 if (trgt_out_lx->flush == 0 &&
2275 trgt_out_lx->sourcetype == SOURCE_IN &&
2276 srcin_buflimit_reached(trgt_out_lx)
2277 == 0 && (
2278 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2279 trgt_out_lx->target.out.seqno_acked) == 0 ||
2280 trgt_out_lx->is_highlatency != 0))
2281 goto out;
2283 if (trgt_out_lx->flush == 0 &&
2284 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
2285 cpacket_write_allowed(trgt_out_lx) != 0)
2286 goto out;
2288 if (windowlimit == 0 || (windowlimit < len &&
2289 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2290 trgt_out_lx->target.out.seqno_acked) == 0)) {
2291 trgt_out_lx->target.out.windowlimit_reached = 1;
2292 snd_delayed_lowbuf = 1;
2293 _flush_out_ignore_lowbuf(trgt_out_lx);
2294 goto out;
2297 if (nbcongwin_send_allowed(nb) == 0)
2298 return RC_FLUSH_CONN_OUT_CONG;
2300 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, len)) {
2301 trgt_out_lx->target.out.windowlimit_reached = 1;
2302 snd_delayed_lowbuf = 1;
2305 if (len > windowlimit) {
2306 len = windowlimit;
2307 _flush_out_ignore_lowbuf(trgt_out_lx);
2310 if (send_conndata_as_skb(nb, len))
2311 rc = _flush_out_skb(trgt_out_lx, len,
2312 snd_delayed_lowbuf);
2313 else
2314 rc = _flush_out_conndata(trgt_out_lx, len,
2315 snd_delayed_lowbuf);
2318 if (rc == RC_FLUSH_CONN_OUT_OK ||
2319 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2320 maxsend_left -= len;
2321 *sent += len;
2324 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2325 return RC_FLUSH_CONN_OUT_CONG;
2326 if (rc != RC_FLUSH_CONN_OUT_OK)
2327 return rc;
2330 out:
2331 return RC_FLUSH_CONN_OUT_OK;
2334 int flush_out(struct conn *trgt_out_lx, __u32 *sent)
2336 int rc = _flush_out(trgt_out_lx, 1 << 30, sent, 0);
2338 if (rc == RC_FLUSH_CONN_OUT_CONG || rc == RC_FLUSH_CONN_OUT_MAXSENT ||
2339 rc == RC_FLUSH_CONN_OUT_OOM)
2340 qos_enqueue_conn(trgt_out_lx);
2342 return rc;
2345 void resume_nbstalled_conns(struct work_struct *work)
2347 struct neighbor *nb = container_of(work, struct neighbor,
2348 stalledconn_work);
2349 int rc = RC_FLUSH_CONN_OUT_OK;
2351 spin_lock_bh(&(nb->stalledconn_lock));
2352 nb->stalledconn_work_scheduled = 0;
2353 while (rc != RC_FLUSH_CONN_OUT_NBNOTACTIVE &&
2354 list_empty(&(nb->stalledconn_list)) == 0) {
2355 struct list_head *lh = nb->stalledconn_list.next;
2356 struct conn *trgt_out = container_of(lh, struct conn,
2357 target.out.nbstalled_lh);
2358 __u32 sent = 0;
2359 BUG_ON(trgt_out->targettype != TARGET_OUT);
2360 list_del(lh);
2361 lh->prev = 0;
2362 lh->next = 0;
2364 spin_unlock_bh(&(nb->stalledconn_lock));
2366 spin_lock_bh(&(trgt_out->rcv_lock));
2367 if (likely(trgt_out->targettype == TARGET_OUT))
2368 rc = flush_out(trgt_out, &sent);
2369 spin_unlock_bh(&(trgt_out->rcv_lock));
2371 if (sent != 0)
2372 wake_sender(trgt_out);
2374 kref_put(&(trgt_out->ref), free_conn);
2376 spin_lock_bh(&(nb->stalledconn_lock));
2378 spin_unlock_bh(&(nb->stalledconn_lock));
2380 kref_put(&(nb->ref), neighbor_free);
2383 int __init cor_snd_init(void)
2385 connretrans_slab = kmem_cache_create("cor_connretrans",
2386 sizeof(struct conn_retrans), 8, 0, 0);
2387 if (unlikely(connretrans_slab == 0))
2388 return -ENOMEM;
2390 return 0;
2393 MODULE_LICENSE("GPL");