send_announce: handle queue does not exist, mngdsocket: fix flushing, high latency...
[cor.git] / net / cor / snd.c
blob04ae818940b77c30b3edbc6eee3a0a6877c40d6c
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2020 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
24 #include <linux/kthread.h>
26 #include "cor.h"
28 static struct kmem_cache *connretrans_slab;
30 static DEFINE_SPINLOCK(queues_lock);
31 static LIST_HEAD(queues);
32 static LIST_HEAD(queues_waitexit);
34 static void qos_waitexit(struct work_struct *work);
35 DECLARE_WORK(qos_waitexit_work, qos_waitexit);
37 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
38 int from_qos, int maxsend_forcedelay);
40 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
41 ktime_t cmsg_send_start, int caller, int from_nbcongwin_resume);
44 #ifdef DEBUG_QOS_SLOWSEND
45 static DEFINE_SPINLOCK(slowsend_lock);
46 static unsigned long last_send;
49 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
51 int allowsend = 0;
52 unsigned long jiffies_tmp;
53 spin_lock_bh(&slowsend_lock);
54 jiffies_tmp = jiffies;
55 if (last_send != jiffies_tmp) {
56 if (last_send + 1 == jiffies_tmp) {
57 last_send = jiffies_tmp;
58 } else {
59 last_send = jiffies_tmp - 1;
61 allowsend = 1;
63 spin_unlock_bh(&slowsend_lock);
65 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d", caller, allowsend); */
66 if (allowsend) {
67 return dev_queue_xmit(skb);
68 } else {
69 kfree_skb(skb);
70 return NET_XMIT_DROP;
73 #endif
75 static void free_connretrans(struct kref *ref)
77 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
78 struct conn *cn = cr->trgt_out_o;
80 BUG_ON(cr->state != CONN_RETRANS_ACKED);
82 kmem_cache_free(connretrans_slab, cr);
83 kref_put(&(cn->ref), free_conn);
86 void free_qos(struct kref *ref)
88 struct qos_queue *q = container_of(ref, struct qos_queue, ref);
89 kfree(q);
93 static void qos_queue_set_congstatus(struct qos_queue *q_locked);
95 /**
96 * neighbor congestion window:
97 * increment by 4096 every round trip if more that 2/3 of cwin is used
99 * in case of packet loss decrease by 1/4:
100 * - <= 1/8 immediately and
101 * - <= 1/4 during the next round trip
103 * in case of multiple packet loss events, do not decrement more than once per
104 * round trip
107 #ifdef COR_NBCONGWIN
109 /*extern __u64 get_bufspace_used(void);
111 static void print_conn_bufstats(struct neighbor *nb)
113 / * not threadsafe, but this is only for debugging... * /
114 __u64 totalsize = 0;
115 __u64 read_remaining = 0;
116 __u32 numconns = 0;
117 struct list_head *lh;
118 unsigned long iflags;
120 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
122 lh = nb->conns_waiting.lh.next;
123 while (lh != &(nb->conns_waiting.lh)) {
124 struct conn *cn = container_of(lh, struct conn,
125 target.out.rb.lh);
126 totalsize += cn->data_buf.datasize;
127 read_remaining += cn->data_buf.read_remaining;
128 lh = lh->next;
131 lh = nb->conns_waiting.lh_nextpass.next;
132 while (lh != &(nb->conns_waiting.lh_nextpass)) {
133 struct conn *cn = container_of(lh, struct conn,
134 target.out.rb.lh);
135 totalsize += cn->data_buf.datasize;
136 read_remaining += cn->data_buf.read_remaining;
137 lh = lh->next;
140 numconns = nb->conns_waiting.cnt;
142 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
144 printk(KERN_ERR "conn %llu %llu %u", totalsize, read_remaining, numconns);
145 } */
147 static void nbcongwin_data_retransmitted(struct neighbor *nb, __u64 bytes_sent)
149 __u64 min_cwin = mss_conndata(nb, 0)*2 << NBCONGWIN_SHIFT;
150 __u64 cwin;
152 unsigned long iflags;
154 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
156 cwin = atomic64_read(&(nb->nbcongwin.cwin));
158 /* printk(KERN_ERR "retrans %llu %llu", cwin >> NBCONGWIN_SHIFT,
159 get_bufspace_used());
160 print_conn_bufstats(nb); */
162 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
163 BUG_ON(cwin >= U64_MAX/1024);
165 if (bytes_sent > 1024)
166 bytes_sent = 1024;
168 if (nb->nbcongwin.cwin_shrinkto == cwin) {
169 if (bytes_sent > 512) {
170 cwin -= cwin/16;
171 } else {
172 cwin -= (bytes_sent * cwin) / (1024 * 8);
174 if (cwin < min_cwin)
175 cwin = min_cwin;
176 atomic64_set(&(nb->nbcongwin.cwin), cwin);
179 nb->nbcongwin.cwin_shrinkto -=
180 (bytes_sent * nb->nbcongwin.cwin_shrinkto) / (1024 * 8);
182 nb->nbcongwin.cwin_shrinkto = max(nb->nbcongwin.cwin_shrinkto,
183 cwin - cwin/8);
185 if (nb->nbcongwin.cwin_shrinkto < min_cwin)
186 nb->nbcongwin.cwin_shrinkto = min_cwin;
188 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
191 static __u64 nbcongwin_update_cwin(struct neighbor *nb_cwlocked,
192 __u64 data_intransit, __u64 bytes_acked)
194 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
195 __u32 INCR_PER_RTT = 4096;
197 __u64 cwin = atomic64_read(&(nb_cwlocked->nbcongwin.cwin));
199 __u64 cwin_tmp;
200 __u64 incrby;
202 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
203 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT-2));
204 if (unlikely(shrinkby > cwin))
205 cwin = 0;
206 else
207 cwin -= shrinkby;
209 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
210 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
214 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
215 goto out;
217 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
219 if (unlikely(bytes_acked >= U64_MAX/INCR_PER_RTT/CWIN_MUL))
220 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
221 cwin_tmp / CWIN_MUL / CWIN_MUL);
222 else if (unlikely(bytes_acked >=
223 U64_MAX/INCR_PER_RTT/CWIN_MUL/CWIN_MUL))
224 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
225 cwin_tmp / CWIN_MUL);
226 else
227 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
228 CWIN_MUL, cwin_tmp);
230 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
232 if (unlikely(cwin + incrby < cwin))
233 cwin = U64_MAX;
234 else
235 cwin += incrby;
237 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
238 nb_cwlocked->nbcongwin.cwin_shrinkto))
239 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
240 else
241 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
243 out:
244 atomic64_set(&(nb_cwlocked->nbcongwin.cwin), cwin);
246 return cwin;
249 void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
251 unsigned long iflags;
252 struct qos_queue *q = nb->queue;
253 __u64 data_intransit;
254 __u64 cwin;
256 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
258 data_intransit = atomic64_read(&(nb->nbcongwin.data_intransit));
260 cwin = nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
262 BUG_ON(bytes_acked > data_intransit);
263 atomic64_sub(bytes_acked, &(nb->nbcongwin.data_intransit));
264 data_intransit -= bytes_acked;
266 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
267 goto out_sendnok;
269 spin_lock(&(q->qlock));
270 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
271 if (nb->conns_waiting.cnt == 0) {
272 nb->rb.in_queue = RB_INQUEUE_FALSE;
273 } else {
274 _qos_enqueue(q, &(nb->rb), ns_to_ktime(0),
275 QOS_CALLER_NEIGHBOR, 1);
278 spin_unlock(&(q->qlock));
281 out_sendnok:
282 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
285 static void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
287 atomic64_add(bytes_sent, &(nb->nbcongwin.data_intransit));
290 static int nbcongwin_send_allowed(struct neighbor *nb)
292 unsigned long iflags;
293 int ret = 1;
294 struct qos_queue *q = nb->queue;
295 int krefput_queue = 0;
297 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
298 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
299 return 1;
301 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
303 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
304 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
305 goto out_ok;
307 ret = 0;
309 spin_lock(&(q->qlock));
310 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
311 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
312 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
313 list_del(&(nb->rb.lh));
314 kref_put(&(nb->ref), kreffree_bug);
315 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
316 BUG_ON(q->numconns < nb->conns_waiting.cnt);
317 q->numconns -= nb->conns_waiting.cnt;
318 q->priority_sum -= nb->conns_waiting.priority_sum;
319 krefput_queue = 1;
321 qos_queue_set_congstatus(q);
322 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
323 } else {
324 BUG();
326 spin_unlock(&(q->qlock));
328 if (krefput_queue != 0)
329 kref_put(&(q->ref), free_qos);
331 out_ok:
332 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
334 return ret;
337 #else
339 static inline void nbcongwin_data_retransmitted(struct neighbor *nb,
340 __u64 bytes_sent)
344 static inline void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
348 static inline void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
352 static inline int nbcongwin_send_allowed(struct neighbor *nb)
354 return 1;
357 #endif
359 static void _resume_conns_accountbusytime(struct conn *trgt_out_l,
360 __u32 priority, __u32 burstprio,
361 unsigned long jiffies_nb_lastduration)
364 unsigned long jiffies_tmp = jiffies;
365 __u64 jiffies_last_idle_mul = (1LL << JIFFIES_LAST_IDLE_SHIFT);
366 __u64 burstfactor;
367 __u64 jiffies_shifted_busy;
369 BUG_ON(burstprio < priority);
371 burstfactor = div_u64(1024LL * (__u64) burstprio, priority);
372 BUG_ON(burstfactor < 1024);
373 burstfactor = 1024 + (burstfactor - 1024) * 2;
375 jiffies_shifted_busy = (jiffies_nb_lastduration * burstfactor *
376 jiffies_last_idle_mul) / 1024;
378 BUG_ON(BURSTPRIO_MAXIDLETIME_SECS >
379 (1 << 30) / (HZ*jiffies_last_idle_mul));
381 if (unlikely(jiffies_shifted_busy > HZ * BURSTPRIO_MAXIDLETIME_SECS *
382 jiffies_last_idle_mul))
383 trgt_out_l->target.out.jiffies_idle_since =
384 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
385 else
386 trgt_out_l->target.out.jiffies_idle_since +=
387 jiffies_shifted_busy;
389 if (unlikely(time_before(jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT,
390 trgt_out_l->target.out.jiffies_idle_since)))
391 trgt_out_l->target.out.jiffies_idle_since =
392 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
395 static unsigned long get_conn_idletime(struct conn *trgt_out_l)
397 unsigned long jiffies_shifted = jiffies << JIFFIES_LAST_IDLE_SHIFT;
398 __u32 burst_maxidle_hz_shifted = (BURSTPRIO_MAXIDLETIME_SECS*HZ) <<
399 JIFFIES_LAST_IDLE_SHIFT;
400 unsigned long idletime_hz_shifted;
402 if (unlikely(time_before(jiffies_shifted,
403 trgt_out_l->target.out.jiffies_idle_since))) {
404 idletime_hz_shifted = 0;
405 trgt_out_l->target.out.jiffies_idle_since = jiffies_shifted -
406 burst_maxidle_hz_shifted;
407 } else {
408 idletime_hz_shifted = jiffies_shifted -
409 trgt_out_l->target.out.jiffies_idle_since;
411 if (unlikely(idletime_hz_shifted > burst_maxidle_hz_shifted)) {
412 idletime_hz_shifted = burst_maxidle_hz_shifted;
413 trgt_out_l->target.out.jiffies_idle_since =
414 jiffies_shifted -
415 burst_maxidle_hz_shifted;
419 return idletime_hz_shifted;
422 static __u32 _resume_conns_burstprio(struct conn *trgt_out_l, __u32 priority)
424 unsigned long idletime_hz_shifted = get_conn_idletime(trgt_out_l);
425 __u32 idletime_msecs = jiffies_to_msecs(idletime_hz_shifted >>
426 JIFFIES_LAST_IDLE_SHIFT);
427 __u32 burstfactor;
428 __u64 newprio;
430 BUG_ON(idletime_msecs > BURSTPRIO_MAXIDLETIME_SECS*1000);
431 BUG_ON(BURSTPRIO_MAXIDLETIME_SECS*1000LL > U32_MAX / 1024);
433 burstfactor = (1024 * idletime_msecs) /
434 (BURSTPRIO_MAXIDLETIME_SECS * 1000);
436 if (trgt_out_l->is_highlatency != 0)
437 newprio = (((__u64) priority) * (1024 + 1 * burstfactor)) /
438 1024;
439 else
440 newprio = (((__u64) priority) * (1024 + 2 * burstfactor)) /
441 1024;
443 BUG_ON(newprio > U32_MAX);
444 return (__u32) newprio;
447 static __u64 _resume_conns_maxsend(struct qos_queue *q, struct conn *trgt_out_l,
448 __u32 newpriority, int *maxsend_forcedelay)
450 unsigned long iflags;
452 struct neighbor *nb = trgt_out_l->target.out.nb;
453 __u32 oldpriority = trgt_out_l->target.out.rb_priority;
454 __u64 priority_sum;
455 __u32 numconns;
456 __u64 bytes_per_round;
458 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
459 spin_lock(&(q->qlock));
461 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
462 BUG_ON(q->priority_sum < oldpriority);
463 nb->conns_waiting.priority_sum -= oldpriority;
464 q->priority_sum -= oldpriority;
466 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
467 nb->conns_waiting.priority_sum);
468 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
469 nb->conns_waiting.priority_sum += newpriority;
470 q->priority_sum += newpriority;
472 priority_sum = q->priority_sum;
473 numconns = q->numconns;
475 spin_unlock(&(q->qlock));
476 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
478 trgt_out_l->target.out.rb_priority = newpriority;
480 if (numconns <= 4) {
481 *maxsend_forcedelay = 1;
482 bytes_per_round = 2048LL;
483 } else {
484 *maxsend_forcedelay = 0;
485 bytes_per_round = 1024LL;
488 if (trgt_out_l->is_highlatency != 0)
489 bytes_per_round += bytes_per_round/8;
491 return div_u64(bytes_per_round * ((__u64) newpriority) *
492 ((__u64) numconns), priority_sum);
495 static int _resume_neighbors_nextpass(struct neighbor *nb_waitingconnslocked)
497 BUG_ON(list_empty(&(nb_waitingconnslocked->conns_waiting.lh)) == 0);
499 if (list_empty(&(nb_waitingconnslocked->conns_waiting.lh_nextpass))) {
500 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
501 return 1;
504 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
506 swap_list_items(&(nb_waitingconnslocked->conns_waiting.lh),
507 &(nb_waitingconnslocked->conns_waiting.lh_nextpass));
509 return 0;
512 static int _resume_neighbors(struct qos_queue *q, struct neighbor *nb,
513 unsigned long jiffies_nb_lastduration, int *progress)
515 unsigned long iflags;
517 while (1) {
518 __u32 priority;
519 __u32 burstprio;
520 __u32 maxsend;
521 int maxsend_forcedelay;
523 int rc2;
524 __u32 sent2 = 0;
526 struct conn *cn = 0;
527 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
528 if (list_empty(&(nb->conns_waiting.lh)) != 0) {
529 int done = _resume_neighbors_nextpass(nb);
530 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
531 iflags);
532 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
534 BUG_ON(nb->conns_waiting.cnt == 0);
536 cn = container_of(nb->conns_waiting.lh.next, struct conn,
537 target.out.rb.lh);
538 BUG_ON(cn->targettype != TARGET_OUT);
539 BUG_ON(cn->target.out.rb.lh.prev != &(nb->conns_waiting.lh));
540 BUG_ON((cn->target.out.rb.lh.next == &(nb->conns_waiting.lh)) &&
541 (nb->conns_waiting.lh.prev !=
542 &(cn->target.out.rb.lh)));
543 list_del(&(cn->target.out.rb.lh));
544 list_add_tail(&(cn->target.out.rb.lh),
545 &(nb->conns_waiting.lh_nextpass));
546 kref_get(&(cn->ref));
547 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
550 priority = refresh_conn_priority(cn, 0);
552 spin_lock_bh(&(cn->rcv_lock));
554 if (unlikely(cn->targettype != TARGET_OUT)) {
555 spin_unlock_bh(&(cn->rcv_lock));
556 continue;
559 burstprio = _resume_conns_burstprio(cn, priority);
561 maxsend = _resume_conns_maxsend(q, cn, burstprio,
562 &maxsend_forcedelay);
563 if (cn->target.out.maxsend_extra >= maxsend)
564 maxsend_forcedelay = 0;
565 maxsend += cn->target.out.maxsend_extra;
566 if (unlikely(maxsend > U32_MAX))
567 maxsend = U32_MAX;
568 if (unlikely(maxsend >= 65536))
569 maxsend_forcedelay = 0;
571 rc2 = _flush_out(cn, maxsend, &sent2, 1, maxsend_forcedelay);
573 if (rc2 == RC_FLUSH_CONN_OUT_OK ||
574 rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE) {
575 cn->target.out.maxsend_extra = 0;
576 qos_remove_conn(cn);
577 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
578 rc2 == RC_FLUSH_CONN_OUT_OOM)) {
579 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
580 if (likely(cn->target.out.rb.in_queue !=
581 RB_INQUEUE_FALSE)) {
582 list_del(&(cn->target.out.rb.lh));
583 list_add(&(cn->target.out.rb.lh),
584 &(nb->conns_waiting.lh));
586 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
587 iflags);
588 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
589 rc2 == RC_FLUSH_CONN_OUT_OOM) {
590 cn->target.out.maxsend_extra = 0;
591 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
592 if (unlikely(maxsend - sent2 > 65535))
593 cn->target.out.maxsend_extra = 65535;
594 else
595 cn->target.out.maxsend_extra = maxsend - sent2;
598 if (sent2 != 0)
599 _resume_conns_accountbusytime(cn, priority, burstprio,
600 jiffies_nb_lastduration);
602 spin_unlock_bh(&(cn->rcv_lock));
604 if (sent2 != 0) {
605 *progress = 1;
606 wake_sender(cn);
609 kref_put(&(cn->ref), free_conn);
611 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
612 rc2 == RC_FLUSH_CONN_OUT_OOM) {
613 return QOS_RESUME_CONG;
618 static struct neighbor *resume_neighbors_peeknextnb(struct qos_queue *q,
619 unsigned long *jiffies_nb_lastduration)
621 unsigned long iflags;
623 struct neighbor *nb;
625 spin_lock_irqsave(&(q->qlock), iflags);
627 if (list_empty(&(q->neighbors_waiting))) {
628 if (list_empty(&(q->neighbors_waiting_nextpass))) {
629 BUG_ON(q->numconns != 0);
630 spin_unlock_irqrestore(&(q->qlock), iflags);
632 return 0;
633 } else {
634 unsigned long jiffies_tmp = jiffies;
635 swap_list_items(&(q->neighbors_waiting),
636 &(q->neighbors_waiting_nextpass));
638 WARN_ONCE(time_before(jiffies_tmp,
639 q->jiffies_nb_pass_start),
640 "resume_neighbors_peeknextnb: jiffies after jiffies_nb_pass_start (this is only a performance issue)");
642 q->jiffies_nb_lastduration = jiffies -
643 q->jiffies_nb_pass_start;
644 q->jiffies_nb_pass_start = jiffies_tmp;
648 *jiffies_nb_lastduration = q->jiffies_nb_lastduration;
651 BUG_ON(q->numconns == 0);
652 BUG_ON(list_empty(&(q->neighbors_waiting)));
654 nb = container_of(q->neighbors_waiting.next, struct neighbor, rb.lh);
656 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
657 BUG_ON(nb->rb.lh.prev != &(q->neighbors_waiting));
658 BUG_ON((nb->rb.lh.next == &(q->neighbors_waiting)) &&
659 (q->neighbors_waiting.prev != &(nb->rb.lh)));
661 kref_get(&(nb->ref));
663 spin_unlock_irqrestore(&(q->qlock), iflags);
665 return nb;
668 static int resume_neighbors(struct qos_queue *q, int *sent)
670 unsigned long iflags;
671 int rc;
673 unsigned long jiffies_nb_lastduration;
674 struct neighbor *nb = resume_neighbors_peeknextnb(q,
675 &jiffies_nb_lastduration);
677 if (nb == 0)
678 return QOS_RESUME_DONE;
680 atomic_set(&(nb->cmsg_delay_conndata), 1);
682 rc = _resume_neighbors(q, nb, jiffies_nb_lastduration, sent);
683 if (rc == QOS_RESUME_CONG) {
684 kref_put(&(nb->ref), neighbor_free);
685 return QOS_RESUME_CONG;
687 BUG_ON(rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR);
689 atomic_set(&(nb->cmsg_delay_conndata), 0);
690 spin_lock_bh(&(nb->cmsg_lock));
691 schedule_controlmsg_timer(nb);
692 spin_unlock_bh(&(nb->cmsg_lock));
694 spin_lock_irqsave(&(q->qlock), iflags);
695 if (likely(nb->rb.in_queue == RB_INQUEUE_TRUE)) {
696 if (nb->conns_waiting.cnt == 0) {
697 nb->rb.in_queue = RB_INQUEUE_FALSE;
698 list_del(&(nb->rb.lh));
699 kref_put(&(nb->ref), kreffree_bug);
700 } else {
701 list_del(&(nb->rb.lh));
702 list_add_tail(&(nb->rb.lh),
703 &(q->neighbors_waiting_nextpass));
706 spin_unlock_irqrestore(&(q->qlock), iflags);
708 kref_put(&(nb->ref), neighbor_free);
710 return QOS_RESUME_NEXTNEIGHBOR;
713 static int send_retrans(struct neighbor *nb, int *sent);
715 static int __qos_resume(struct qos_queue *q, int caller, int *sent)
717 unsigned long iflags;
718 int rc = QOS_RESUME_DONE;
719 struct list_head *lh;
721 spin_lock_irqsave(&(q->qlock), iflags);
723 if (caller == QOS_CALLER_KPACKET)
724 lh = &(q->kpackets_waiting);
725 else if (caller == QOS_CALLER_CONN_RETRANS)
726 lh = &(q->conn_retrans_waiting);
727 else if (caller == QOS_CALLER_ANNOUNCE)
728 lh = &(q->announce_waiting);
729 else
730 BUG();
732 while (list_empty(lh) == 0) {
733 struct resume_block *rb = container_of(lh->next,
734 struct resume_block, lh);
735 ktime_t cmsg_send_start;
736 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
737 rb->in_queue = RB_INQUEUE_FALSE;
738 list_del(&(rb->lh));
740 if (caller == QOS_CALLER_KPACKET)
741 cmsg_send_start = container_of(rb, struct neighbor,
742 rb_kp)->cmsg_send_start;
744 spin_unlock_irqrestore(&(q->qlock), iflags);
745 if (caller == QOS_CALLER_KPACKET) {
746 rc = send_messages(container_of(rb, struct neighbor,
747 rb_kp), cmsg_send_start, sent);
748 } else if (caller == QOS_CALLER_CONN_RETRANS) {
749 rc = send_retrans(container_of(rb, struct neighbor,
750 rb_cr), sent);
751 } else if (caller == QOS_CALLER_ANNOUNCE) {
752 rc = _send_announce(container_of(rb,
753 struct announce_data, rb), 1, sent);
754 } else {
755 BUG();
757 spin_lock_irqsave(&(q->qlock), iflags);
759 if (rc != QOS_RESUME_DONE && caller == QOS_CALLER_KPACKET)
760 container_of(rb, struct neighbor, rb_kp
761 )->cmsg_send_start = cmsg_send_start;
763 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
764 rb->in_queue = RB_INQUEUE_TRUE;
765 list_add(&(rb->lh), lh);
766 break;
769 if (caller == QOS_CALLER_KPACKET) {
770 kref_put(&(container_of(rb, struct neighbor,
771 rb_kp)->ref), neighbor_free);
772 } else if (caller == QOS_CALLER_CONN_RETRANS) {
773 kref_put(&(container_of(rb, struct neighbor,
774 rb_cr)->ref), neighbor_free);
775 } else if (caller == QOS_CALLER_ANNOUNCE) {
776 kref_put(&(container_of(rb,
777 struct announce_data, rb)->ref),
778 announce_data_free);
779 } else {
780 BUG();
783 kref_put(&(q->ref), kreffree_bug);
786 spin_unlock_irqrestore(&(q->qlock), iflags);
788 return rc;
791 static int _qos_resume(struct qos_queue *q, int *sent)
793 unsigned long iflags;
794 int i = QOS_CALLER_KPACKET;
795 int rc;
797 spin_lock_irqsave(&(q->qlock), iflags);
799 while (1) {
800 if (q->dev == 0) {
801 rc = QOS_RESUME_EXIT;
802 break;
805 if (i == QOS_CALLER_KPACKET &&
806 list_empty(&(q->kpackets_waiting))) {
807 i = QOS_CALLER_CONN_RETRANS;
808 continue;
809 } else if (i == QOS_CALLER_CONN_RETRANS &&
810 list_empty(&(q->conn_retrans_waiting))) {
811 i = QOS_CALLER_ANNOUNCE;
812 continue;
813 } else if (i == QOS_CALLER_ANNOUNCE &&
814 list_empty(&(q->announce_waiting))) {
815 i = QOS_CALLER_NEIGHBOR;
816 continue;
817 } else if (i == QOS_CALLER_NEIGHBOR &&
818 list_empty(&(q->neighbors_waiting)) &&
819 list_empty(&(q->neighbors_waiting_nextpass))) {
820 rc = QOS_RESUME_DONE;
821 break;
824 spin_unlock_irqrestore(&(q->qlock), iflags);
826 if (i == QOS_CALLER_NEIGHBOR) {
827 rc = resume_neighbors(q, sent);
828 } else {
829 rc = __qos_resume(q, i, sent);
832 spin_lock_irqsave(&(q->qlock), iflags);
834 if (rc == QOS_RESUME_CONG)
835 break;
837 i = QOS_CALLER_KPACKET;
840 if (rc == QOS_RESUME_DONE) {
841 BUG_ON(!list_empty(&(q->kpackets_waiting)));
842 BUG_ON(!list_empty(&(q->conn_retrans_waiting)));
843 BUG_ON(!list_empty(&(q->announce_waiting)));
844 BUG_ON(!list_empty(&(q->neighbors_waiting)));
845 BUG_ON(!list_empty(&(q->neighbors_waiting_nextpass)));
847 atomic_set(&(q->qos_resume_scheduled), 0);
850 qos_queue_set_congstatus(q);
852 if (q->dev == 0)
853 rc = QOS_RESUME_EXIT;
855 spin_unlock_irqrestore(&(q->qlock), iflags);
857 return rc;
860 int qos_resume_threadfunc(void *data)
862 struct qos_queue *q = (struct qos_queue *) data;
864 while (1) {
865 int sent = 0;
866 int rc;
868 rc = _qos_resume(q, &sent);
870 if (rc == QOS_RESUME_DONE) {
871 wait_event(q->qos_resume_wq,
872 atomic_read(&(q->qos_resume_scheduled))
873 != 0);
874 } else if (rc == QOS_RESUME_CONG) {
875 unsigned long jiffies_tmp = jiffies;
876 unsigned long delay_ms = 0;
878 if (sent)
879 q->jiffies_lastprogress = jiffies_tmp;
880 delay_ms = (jiffies_to_msecs(jiffies_tmp -
881 q->jiffies_lastprogress) + 8) / 4;
882 if (delay_ms < 2) {
883 delay_ms = 2;
884 } else if (delay_ms > 20) {
885 delay_ms = 20;
888 msleep(delay_ms);
889 } else if (rc == QOS_RESUME_EXIT) {
890 return 0;
891 } else {
892 BUG();
897 static inline int qos_queue_is_destroyed(struct qos_queue *q_locked)
899 return q_locked->dev == 0;
902 struct qos_queue *get_queue(struct net_device *dev)
904 struct qos_queue *ret = 0;
905 struct list_head *curr;
907 spin_lock_bh(&(queues_lock));
908 curr = queues.next;
909 while (curr != (&queues)) {
910 struct qos_queue *q = container_of(curr,
911 struct qos_queue, queue_list);
912 if (q->dev == dev) {
913 ret = q;
914 kref_get(&(ret->ref));
915 break;
917 curr = curr->next;
919 spin_unlock_bh(&(queues_lock));
920 return ret;
923 static void qos_waitexit(struct work_struct *work)
925 spin_lock_bh(&(queues_lock));
926 while (!list_empty(&queues_waitexit)) {
927 struct qos_queue *q = container_of(queues_waitexit.next,
928 struct qos_queue, queue_list);
929 list_del(&(q->queue_list));
931 spin_unlock_bh(&(queues_lock));
933 kthread_stop(q->qos_resume_thread);
934 put_task_struct(q->qos_resume_thread);
935 kref_put(&(q->ref), free_qos);
937 spin_lock_bh(&(queues_lock));
939 spin_unlock_bh(&(queues_lock));
942 static void _destroy_queue_kpackets(struct qos_queue *q)
944 while (list_empty(&(q->kpackets_waiting)) == 0) {
945 struct list_head *curr = q->kpackets_waiting.next;
946 struct resume_block *rb = container_of(curr,
947 struct resume_block, lh);
948 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
949 rb->in_queue = RB_INQUEUE_FALSE;
950 list_del(curr);
952 kref_put(&(container_of(rb, struct neighbor, rb_kp)->ref),
953 neighbor_free);
954 kref_put(&(q->ref), kreffree_bug);
958 static void _destroy_queue_conn_retrans(struct qos_queue *q)
960 while (list_empty(&(q->conn_retrans_waiting)) == 0) {
961 struct list_head *curr = q->conn_retrans_waiting.next;
962 struct resume_block *rb = container_of(curr,
963 struct resume_block, lh);
964 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
965 rb->in_queue = RB_INQUEUE_FALSE;
966 list_del(curr);
968 kref_put(&(container_of(rb, struct neighbor, rb_cr)->ref),
969 neighbor_free);
970 kref_put(&(q->ref), kreffree_bug);
974 static void _destroy_queue_announce(struct qos_queue *q)
976 while (list_empty(&(q->announce_waiting)) == 0) {
977 struct list_head *curr = q->announce_waiting.next;
978 struct resume_block *rb = container_of(curr,
979 struct resume_block, lh);
980 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
981 rb->in_queue = RB_INQUEUE_FALSE;
982 list_del(curr);
984 kref_put(&(container_of(rb, struct announce_data, rb)->ref),
985 announce_data_free);
986 kref_put(&(q->ref), kreffree_bug);
990 static void _destroy_queue_neighbor(struct qos_queue *q, struct list_head *lh)
992 while (list_empty(lh) == 0) {
993 struct list_head *curr = lh->next;
994 struct resume_block *rb = container_of(curr,
995 struct resume_block, lh);
996 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
997 rb->in_queue = RB_INQUEUE_FALSE;
998 list_del(curr);
1000 kref_put(&(container_of(rb, struct neighbor, rb)->ref),
1001 neighbor_free);
1002 kref_put(&(q->ref), kreffree_bug);
1006 static struct qos_queue *unlink_queue(struct net_device *dev)
1008 struct qos_queue *ret = 0;
1009 struct list_head *curr;
1011 spin_lock_bh(&(queues_lock));
1012 curr = queues.next;
1013 while (curr != (&queues)) {
1014 struct qos_queue *q = container_of(curr,
1015 struct qos_queue, queue_list);
1016 if (dev == 0 || q->dev == dev) {
1017 ret = q;
1018 kref_get(&(ret->ref));
1020 list_del(&(q->queue_list));
1021 kref_put(&(q->ref), kreffree_bug);
1022 break;
1024 curr = curr->next;
1026 spin_unlock_bh(&(queues_lock));
1027 return ret;
1030 int destroy_queue(struct net_device *dev)
1032 int rc = 1;
1033 unsigned long iflags;
1035 while (1) {
1036 struct qos_queue *q = unlink_queue(dev);
1038 if (q == 0)
1039 break;
1041 rc = 0;
1043 spin_lock_irqsave(&(q->qlock), iflags);
1044 if (q->dev != 0) {
1045 dev_put(q->dev);
1046 q->dev = 0;
1048 _destroy_queue_kpackets(q);
1049 _destroy_queue_conn_retrans(q);
1050 _destroy_queue_announce(q);
1051 _destroy_queue_neighbor(q, &(q->neighbors_waiting));
1052 _destroy_queue_neighbor(q, &(q->neighbors_waiting_nextpass));
1054 spin_unlock_irqrestore(&(q->qlock), iflags);
1056 schedule_qos_resume(q);
1058 spin_lock_bh(&(queues_lock));
1059 list_add(&(q->queue_list), &queues_waitexit);
1060 spin_unlock_bh(&(queues_lock));
1062 schedule_work(&qos_waitexit_work);
1065 return rc;
1068 int create_queue(struct net_device *dev)
1070 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
1072 if (q == 0) {
1073 printk(KERN_ERR "cor: unable to allocate memory for device "
1074 "queue, not enabling device");
1075 return 1;
1078 memset(q, 0, sizeof(struct qos_queue));
1080 spin_lock_init(&(q->qlock));
1082 kref_init(&(q->ref));
1084 q->dev = dev;
1085 dev_hold(dev);
1087 atomic_set(&(q->qos_resume_scheduled), 0);
1089 init_waitqueue_head(&(q->qos_resume_wq));
1091 INIT_LIST_HEAD(&(q->kpackets_waiting));
1092 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
1093 INIT_LIST_HEAD(&(q->announce_waiting));
1094 INIT_LIST_HEAD(&(q->neighbors_waiting));
1095 INIT_LIST_HEAD(&(q->neighbors_waiting_nextpass));
1097 atomic_set(&(q->cong_status), 0);
1099 q->qos_resume_thread = kthread_create(qos_resume_threadfunc,
1100 q, "cor_qos_resume");
1101 if (q->qos_resume_thread == 0) {
1102 printk(KERN_ERR "cor: unable to start qos_resume thread");
1104 if (q->dev != 0) {
1105 dev_put(q->dev);
1106 q->dev = 0;
1109 kref_put(&(q->ref), free_qos);
1111 return 1;
1113 get_task_struct(q->qos_resume_thread);
1114 wake_up_process(q->qos_resume_thread);
1116 spin_lock_bh(&(queues_lock));
1117 list_add(&(q->queue_list), &queues);
1118 spin_unlock_bh(&(queues_lock));
1120 return 0;
1123 static void qos_queue_set_congstatus(struct qos_queue *q_locked)
1125 __u32 newstatus;
1127 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ/50)) {
1128 newstatus = CONGSTATUS_NONE;
1129 } else if (list_empty(&(q_locked->kpackets_waiting)) == 0) {
1130 newstatus = CONGSTATUS_KPACKETS;
1131 } else if (list_empty(&(q_locked->conn_retrans_waiting)) == 0) {
1132 newstatus = CONGSTATUS_RETRANS;
1133 } else if (list_empty(&(q_locked->announce_waiting)) == 0) {
1134 newstatus = CONGSTATUS_ANNOUNCE;
1135 } else if (list_empty(&(q_locked->neighbors_waiting)) == 0 ||
1136 list_empty(&(q_locked->neighbors_waiting_nextpass)) ==
1137 0) {
1138 newstatus = CONGSTATUS_CONNDATA;
1139 } else {
1140 newstatus = CONGSTATUS_NONE;
1143 atomic_set(&(q_locked->cong_status), newstatus);
1146 void qos_set_lastdrop(struct qos_queue *q)
1148 unsigned long iflags;
1150 spin_lock_irqsave(&(q->qlock), iflags);
1151 q->jiffies_lastdrop = jiffies;
1152 qos_queue_set_congstatus(q);
1153 spin_unlock_irqrestore(&(q->qlock), iflags);
1157 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
1158 * caller
1160 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
1161 ktime_t cmsg_send_start, int caller, int from_nbcongwin_resume)
1163 int queues_empty;
1165 if (rb->in_queue == RB_INQUEUE_TRUE) {
1166 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
1168 if (caller == QOS_CALLER_KPACKET) {
1169 struct neighbor *nb = container_of(rb, struct neighbor,
1170 rb_kp);
1171 if (ktime_before(cmsg_send_start, nb->cmsg_send_start))
1172 nb->cmsg_send_start = cmsg_send_start;
1174 return;
1175 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
1176 from_nbcongwin_resume == 0) {
1177 return;
1180 if (unlikely(qos_queue_is_destroyed(q)))
1181 return;
1183 queues_empty = list_empty(&(q->kpackets_waiting)) &&
1184 list_empty(&(q->conn_retrans_waiting)) &&
1185 list_empty(&(q->announce_waiting)) &&
1186 list_empty(&(q->neighbors_waiting)) &&
1187 list_empty(&(q->neighbors_waiting_nextpass));
1189 BUG_ON(!queues_empty && atomic_read(&(q->qos_resume_scheduled)) == 0);
1191 rb->in_queue = RB_INQUEUE_TRUE;
1193 if (caller == QOS_CALLER_KPACKET) {
1194 struct neighbor *nb = container_of(rb, struct neighbor, rb_kp);
1195 nb->cmsg_send_start = cmsg_send_start;
1196 list_add_tail(&(rb->lh), &(q->kpackets_waiting));
1197 kref_get(&(nb->ref));
1198 } else if (caller == QOS_CALLER_CONN_RETRANS) {
1199 list_add_tail(&(rb->lh) , &(q->conn_retrans_waiting));
1200 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
1201 } else if (caller == QOS_CALLER_ANNOUNCE) {
1202 list_add_tail(&(rb->lh), &(q->announce_waiting));
1203 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
1204 } else if (caller == QOS_CALLER_NEIGHBOR) {
1205 struct neighbor *nb = container_of(rb, struct neighbor, rb);
1206 list_add_tail(&(rb->lh), &(q->neighbors_waiting_nextpass));
1207 kref_get(&(nb->ref));
1208 BUG_ON(nb->conns_waiting.cnt == 0);
1209 q->numconns += nb->conns_waiting.cnt;
1210 q->priority_sum += nb->conns_waiting.priority_sum;
1211 q->jiffies_nb_lastduration = 0;
1212 q->jiffies_nb_pass_start = jiffies;
1213 } else {
1214 BUG();
1216 kref_get(&(q->ref));
1218 schedule_qos_resume(q);
1220 qos_queue_set_congstatus(q);
1223 void qos_enqueue(struct qos_queue *q, struct resume_block *rb,
1224 ktime_t cmsg_send_start, int caller)
1226 unsigned long iflags;
1228 spin_lock_irqsave(&(q->qlock), iflags);
1229 _qos_enqueue(q, rb, cmsg_send_start, caller, 0);
1230 spin_unlock_irqrestore(&(q->qlock), iflags);
1233 void qos_remove_conn(struct conn *trgt_out_lx)
1235 unsigned long iflags;
1236 struct neighbor *nb = trgt_out_lx->target.out.nb;
1237 struct qos_queue *q = nb->queue;
1238 int sched_cmsg = 0;
1239 int krefput_nb = 0;
1241 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1242 BUG_ON(q == 0);
1244 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1245 if (trgt_out_lx->target.out.rb.in_queue == RB_INQUEUE_FALSE) {
1246 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1247 return;
1249 spin_lock(&(q->qlock));
1251 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_FALSE;
1252 list_del(&(trgt_out_lx->target.out.rb.lh));
1253 BUG_ON(nb->conns_waiting.cnt == 0);
1254 nb->conns_waiting.cnt--;
1255 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1256 BUG_ON(q->numconns == 0);
1257 q->numconns--;
1260 BUG_ON(nb->conns_waiting.priority_sum <
1261 trgt_out_lx->target.out.rb_priority);
1262 BUG_ON(q->priority_sum < trgt_out_lx->target.out.rb_priority);
1263 nb->conns_waiting.priority_sum -=
1264 trgt_out_lx->target.out.rb_priority;
1265 q->priority_sum -= trgt_out_lx->target.out.rb_priority;
1266 trgt_out_lx->target.out.rb_priority = 0;
1268 if (list_empty(&(nb->conns_waiting.lh)) &&
1269 list_empty(&(nb->conns_waiting.lh_nextpass))) {
1270 BUG_ON(nb->conns_waiting.priority_sum != 0);
1271 BUG_ON(nb->conns_waiting.cnt != 0);
1272 } else {
1273 BUG_ON(nb->conns_waiting.cnt == 0);
1276 if (list_empty(&(nb->conns_waiting.lh)) &&
1277 list_empty(&(nb->conns_waiting.lh_nextpass)) &&
1278 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1279 nb->rb.in_queue = RB_INQUEUE_FALSE;
1280 list_del(&(nb->rb.lh));
1281 if (atomic_read(&(nb->cmsg_delay_conndata)) != 0) {
1282 atomic_set(&(nb->cmsg_delay_conndata), 0);
1283 sched_cmsg = 1;
1286 krefput_nb = 1;
1288 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1289 list_empty(&(q->neighbors_waiting_nextpass)) &&
1290 q->numconns != 0);
1291 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1292 list_empty(&(q->neighbors_waiting_nextpass)) &&
1293 q->priority_sum != 0);
1295 qos_queue_set_congstatus(q);
1298 spin_unlock(&(q->qlock));
1299 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1301 if (sched_cmsg) {
1302 spin_lock_bh(&(nb->cmsg_lock));
1303 schedule_controlmsg_timer(nb);
1304 spin_unlock_bh(&(nb->cmsg_lock));
1307 kref_put(&(trgt_out_lx->ref), kreffree_bug);
1309 if (krefput_nb)
1310 kref_put(&(nb->ref), neighbor_free);
1313 static void qos_enqueue_conn(struct conn *trgt_out_lx)
1315 unsigned long iflags;
1316 struct neighbor *nb = trgt_out_lx->target.out.nb;
1317 struct qos_queue *q;
1319 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1321 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1323 if (trgt_out_lx->target.out.rb.in_queue != RB_INQUEUE_FALSE)
1324 goto out;
1326 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_TRUE;
1327 list_add_tail(&(trgt_out_lx->target.out.rb.lh),
1328 &(nb->conns_waiting.lh));
1329 kref_get(&(trgt_out_lx->ref));
1330 nb->conns_waiting.cnt++;
1332 q = trgt_out_lx->target.out.nb->queue;
1333 spin_lock(&(q->qlock));
1334 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1335 q->numconns++;
1336 } else {
1337 _qos_enqueue(q, &(nb->rb), ns_to_ktime(0), QOS_CALLER_NEIGHBOR,
1340 spin_unlock(&(q->qlock));
1342 out:
1343 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1346 static struct sk_buff *create_packet(struct neighbor *nb, int size,
1347 gfp_t alloc_flags)
1349 struct sk_buff *ret;
1351 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1352 nb->dev->needed_tailroom, alloc_flags);
1353 if (unlikely(ret == 0))
1354 return 0;
1356 ret->protocol = htons(ETH_P_COR);
1357 ret->dev = nb->dev;
1359 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1360 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1361 nb->dev->dev_addr, ret->len) < 0))
1362 return 0;
1363 skb_reset_network_header(ret);
1365 return ret;
1368 struct sk_buff *create_packet_cmsg(struct neighbor *nb, int size,
1369 gfp_t alloc_flags, __u64 seqno)
1371 struct sk_buff *ret;
1372 char *dest;
1374 ret = create_packet(nb, size + 7, alloc_flags);
1375 if (unlikely(ret == 0))
1376 return 0;
1378 dest = skb_put(ret, 7);
1379 BUG_ON(dest == 0);
1381 dest[0] = PACKET_TYPE_CMSG;
1382 dest += 1;
1384 put_u48(dest, seqno);
1385 dest += 6;
1387 return ret;
1390 struct sk_buff *create_packet_conndata(struct neighbor *nb, int size,
1391 gfp_t alloc_flags, __u32 conn_id, __u64 seqno,
1392 __u8 snd_delayed_lowbuf, __u8 flush)
1394 struct sk_buff *ret;
1395 char *dest;
1397 ret = create_packet(nb, size + 11, alloc_flags);
1398 if (unlikely(ret == 0))
1399 return 0;
1401 dest = skb_put(ret, 11);
1402 BUG_ON(dest == 0);
1404 if (flush != 0) {
1405 if (snd_delayed_lowbuf != 0) {
1406 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH;
1407 } else {
1408 dest[0] = PACKET_TYPE_CONNDATA_FLUSH;
1410 } else {
1411 if (snd_delayed_lowbuf != 0) {
1412 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED;
1413 } else {
1414 dest[0] = PACKET_TYPE_CONNDATA;
1417 dest += 1;
1419 put_u32(dest, conn_id);
1420 dest += 4;
1421 put_u48(dest, seqno);
1422 dest += 6;
1424 return ret;
1427 void reschedule_conn_retrans_timer(struct neighbor *nb_retransconnlocked)
1429 struct conn_retrans *cr = 0;
1431 if (list_empty(&(nb_retransconnlocked->retrans_conn_list)))
1432 return;
1434 cr = container_of(nb_retransconnlocked->retrans_conn_list.next,
1435 struct conn_retrans, timeout_list);
1437 if (time_before_eq(cr->timeout, jiffies)) {
1438 qos_enqueue(nb_retransconnlocked->queue,
1439 &(nb_retransconnlocked->rb_cr), ns_to_ktime(0),
1440 QOS_CALLER_CONN_RETRANS);
1441 } else {
1442 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
1443 cr->timeout) == 0) {
1444 kref_get(&(nb_retransconnlocked->ref));
1450 * warning:
1451 * caller must also call kref_get/put, see reschedule_conn_retrans_timer
1453 static void cancel_conn_retrans(struct neighbor *nb_retransconnlocked,
1454 struct conn *trgt_out_lx, struct conn_retrans *cr,
1455 __u64 *bytes_acked)
1457 if (unlikely(cr->state == CONN_RETRANS_ACKED))
1458 return;
1460 if (cr->state == CONN_RETRANS_SCHEDULED) {
1461 list_del(&(cr->timeout_list));
1462 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
1463 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
1464 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
1465 trgt_out_lx->target.out.retrans_lowwindow--;
1468 if (cr->state != CONN_RETRANS_INITIAL)
1469 *bytes_acked += cr->length;
1471 list_del(&(cr->conn_list));
1472 cr->state = CONN_RETRANS_ACKED;
1474 kref_put(&(cr->ref), free_connretrans);
1478 * nb->retrans_conn_lock must be held when calling this
1479 * (see schedule_retransmit_conn())
1481 static void cancel_acked_conn_retrans(struct conn *trgt_out_l,
1482 __u64 *bytes_acked)
1484 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
1486 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
1487 struct conn_retrans *cr = container_of(
1488 trgt_out_l->target.out.retrans_list.next,
1489 struct conn_retrans, conn_list);
1491 if (seqno_after(cr->seqno + cr->length, seqno_acked)) {
1492 if (seqno_before(cr->seqno, seqno_acked)) {
1493 *bytes_acked += seqno_clean(seqno_acked -
1494 cr->seqno);
1495 cr->length -= seqno_clean(seqno_acked -
1496 cr->seqno);
1497 cr->seqno = seqno_acked;
1499 break;
1502 cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l, cr,
1503 bytes_acked);
1506 reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
1509 void cancel_all_conn_retrans(struct conn *trgt_out_lx)
1511 struct neighbor *nb = trgt_out_lx->target.out.nb;
1512 __u64 bytes_acked = 0;
1514 spin_lock_bh(&(nb->retrans_conn_lock));
1516 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
1517 struct conn_retrans *cr = container_of(
1518 trgt_out_lx->target.out.retrans_list.next,
1519 struct conn_retrans, conn_list);
1520 BUG_ON(cr->trgt_out_o != trgt_out_lx);
1522 cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
1525 reschedule_conn_retrans_timer(nb);
1527 spin_unlock_bh(&(nb->retrans_conn_lock));
1529 if (bytes_acked > 0)
1530 nbcongwin_data_acked(nb, bytes_acked);
1533 static void cancel_all_conn_retrans_nb(struct neighbor *nb)
1535 __u64 bytes_acked = 0;
1537 while (1) {
1538 struct conn_retrans *cr;
1540 spin_lock_bh(&(nb->retrans_conn_lock));
1542 if (list_empty(&(nb->retrans_conn_list))) {
1543 spin_unlock_bh(&(nb->retrans_conn_lock));
1544 break;
1547 cr = container_of(nb->retrans_conn_list.next,
1548 struct conn_retrans, timeout_list);
1550 kref_get(&(cr->ref));
1552 spin_unlock_bh(&(nb->retrans_conn_lock));
1555 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
1556 spin_lock_bh(&(nb->retrans_conn_lock));
1558 if (likely(cr == container_of(nb->retrans_conn_list.next,
1559 struct conn_retrans, timeout_list)))
1560 cancel_conn_retrans(nb, cr->trgt_out_o, cr,
1561 &bytes_acked);
1563 spin_unlock_bh(&(nb->retrans_conn_lock));
1564 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
1566 kref_put(&(cr->ref), free_connretrans);
1569 if (bytes_acked > 0)
1570 nbcongwin_data_acked(nb, bytes_acked);
1573 static struct conn_retrans *prepare_conn_retrans(struct conn *trgt_out_l,
1574 __u64 seqno, __u32 len, __u8 snd_delayed_lowbuf,
1575 struct conn_retrans *cr_splitted, int retransconnlocked)
1577 struct neighbor *nb = trgt_out_l->target.out.nb;
1579 struct conn_retrans *cr = kmem_cache_alloc(connretrans_slab,
1580 GFP_ATOMIC);
1582 if (unlikely(cr == 0))
1583 return 0;
1585 BUG_ON(trgt_out_l->isreset != 0);
1587 memset(cr, 0, sizeof (struct conn_retrans));
1588 cr->trgt_out_o = trgt_out_l;
1589 kref_get(&(trgt_out_l->ref));
1590 cr->seqno = seqno;
1591 cr->length = len;
1592 cr->snd_delayed_lowbuf = snd_delayed_lowbuf;
1593 kref_init(&(cr->ref));
1595 kref_get(&(cr->ref));
1596 if (retransconnlocked == 0)
1597 spin_lock_bh(&(nb->retrans_conn_lock));
1599 if (cr_splitted != 0)
1600 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
1601 else
1602 list_add_tail(&(cr->conn_list),
1603 &(cr->trgt_out_o->target.out.retrans_list));
1605 if (retransconnlocked == 0)
1606 spin_unlock_bh(&(nb->retrans_conn_lock));
1608 return cr;
1611 #define RC_SENDRETRANS_OK 0
1612 #define RC_SENDRETRANS_OOM 1
1613 #define RC_SENDRETRANS_QUEUEFULL 2
1614 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
1616 static int __send_retrans(struct neighbor *nb, struct conn *trgt_out_l,
1617 struct conn_retrans *cr, __u64 *bytes_sent)
1619 __u8 flush = 0;
1621 BUG_ON(cr->length == 0);
1623 if (trgt_out_l->flush != 0 && seqno_eq(cr->seqno + cr->length,
1624 trgt_out_l->target.out.seqno_nextsend) &&
1625 trgt_out_l->data_buf.read_remaining == 0)
1626 flush = 1;
1628 if (send_conndata_as_skb(nb, cr->length)) {
1629 struct sk_buff *skb;
1630 char *dst;
1631 int rc;
1633 skb = create_packet_conndata(nb, cr->length, GFP_ATOMIC,
1634 trgt_out_l->target.out.conn_id, cr->seqno,
1635 cr->snd_delayed_lowbuf, flush);
1636 if (unlikely(skb == 0))
1637 return RC_SENDRETRANS_OOM;
1639 dst = skb_put(skb, cr->length);
1641 databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
1643 rc = cor_dev_queue_xmit(skb, nb->queue,
1644 QOS_CALLER_CONN_RETRANS);
1645 if (rc == NET_XMIT_DROP)
1646 return RC_SENDRETRANS_QUEUEFULLDROPPED;
1647 schedule_retransmit_conn(cr, 1, 0);
1648 if (rc != NET_XMIT_SUCCESS)
1649 return RC_SENDRETRANS_QUEUEFULL;
1651 } else {
1652 struct control_msg_out *cm;
1653 char *buf;
1655 buf = kmalloc(cr->length, GFP_ATOMIC);
1656 if (unlikely(buf == 0))
1657 return RC_SENDRETRANS_OOM;
1659 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
1660 if (unlikely(cm == 0)) {
1661 kfree(buf);
1662 return RC_SENDRETRANS_OOM;
1665 databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
1667 send_conndata(cm, trgt_out_l->target.out.conn_id,
1668 cr->seqno, buf, buf, cr->length,
1669 cr->snd_delayed_lowbuf, flush,
1670 trgt_out_l->is_highlatency, cr);
1673 *bytes_sent += cr->length;
1675 return RC_SENDRETRANS_OK;
1678 static int _send_retrans_splitcr_ifneeded(struct neighbor *nb_retransconnlocked,
1679 struct conn *trgt_out_l, struct conn_retrans *cr)
1681 __u32 targetmss = mss_conndata(nb_retransconnlocked,
1682 trgt_out_l->is_highlatency != 0);
1683 __u64 windowlimit = seqno_clean(
1684 trgt_out_l->target.out.seqno_windowlimit -
1685 cr->seqno);
1686 __u32 maxsize = targetmss;
1687 if (windowlimit < maxsize)
1688 maxsize = windowlimit;
1690 if (unlikely(cr->length > maxsize)) {
1691 struct conn_retrans *cr2 = prepare_conn_retrans(trgt_out_l,
1692 cr->seqno + maxsize, cr->length - maxsize,
1693 cr->snd_delayed_lowbuf, cr, 1);
1694 if (unlikely(cr2 == 0))
1695 return RC_SENDRETRANS_OOM;
1697 cr2->timeout = cr->timeout;
1699 list_add(&(cr2->timeout_list),
1700 &(nb_retransconnlocked->retrans_conn_list));
1701 cr2->state = CONN_RETRANS_SCHEDULED;
1703 cr->length = maxsize;
1706 return RC_SENDRETRANS_OK;
1709 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr,
1710 __u64 *bytes_sent)
1713 struct conn *trgt_out_o = cr->trgt_out_o;
1714 int rc = RC_SENDRETRANS_OK;
1716 spin_lock_bh(&(trgt_out_o->rcv_lock));
1718 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1719 BUG_ON(trgt_out_o->target.out.nb != nb);
1721 spin_lock_bh(&(nb->retrans_conn_lock));
1722 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1723 spin_unlock_bh(&(nb->retrans_conn_lock));
1724 goto out;
1727 BUG_ON(trgt_out_o->isreset != 0);
1729 BUG_ON(seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
1731 if (seqno_after_eq(cr->seqno,
1732 trgt_out_o->target.out.seqno_windowlimit)) {
1733 BUG_ON(cr->state != CONN_RETRANS_SENDING);
1734 cr->state = CONN_RETRANS_LOWWINDOW;
1735 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1736 trgt_out_o->target.out.retrans_lowwindow++;
1738 spin_unlock_bh(&(nb->retrans_conn_lock));
1739 goto out;
1742 rc = _send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
1744 spin_unlock_bh(&(nb->retrans_conn_lock));
1746 kref_get(&(trgt_out_o->ref));
1748 if (rc == RC_SENDRETRANS_OK)
1749 rc = __send_retrans(nb, trgt_out_o, cr, bytes_sent);
1751 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
1752 spin_lock_bh(&(nb->retrans_conn_lock));
1753 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1754 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
1755 if (rc == RC_SENDRETRANS_OOM)
1756 cr->timeout = jiffies + 1;
1757 list_add(&(cr->timeout_list), &(nb->retrans_conn_list));
1758 cr->state = CONN_RETRANS_SCHEDULED;
1759 } else {
1760 BUG();
1762 spin_unlock_bh(&(nb->retrans_conn_lock));
1765 out:
1766 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1768 kref_put(&(trgt_out_o->ref), free_conn);
1770 return (rc == RC_SENDRETRANS_OOM ||
1771 rc == RC_SENDRETRANS_QUEUEFULL ||
1772 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
1775 static int send_retrans(struct neighbor *nb, int *sent)
1777 int queuefull = 0;
1778 int nbstate = get_neigh_state(nb);
1779 __u64 bytes_sent = 0;
1781 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
1782 return QOS_RESUME_DONE;
1783 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1785 * cancel_all_conn_retrans_nb should not be needed, because
1786 * reset_all_conns calls cancel_all_conn_retrans
1788 cancel_all_conn_retrans_nb(nb);
1789 return QOS_RESUME_DONE;
1792 while (1) {
1793 struct conn_retrans *cr = 0;
1795 spin_lock_bh(&(nb->retrans_conn_lock));
1797 if (list_empty(&(nb->retrans_conn_list))) {
1798 spin_unlock_bh(&(nb->retrans_conn_lock));
1799 break;
1802 cr = container_of(nb->retrans_conn_list.next,
1803 struct conn_retrans, timeout_list);
1805 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1807 if (time_after(cr->timeout, jiffies)) {
1808 spin_unlock_bh(&(nb->retrans_conn_lock));
1809 break;
1812 kref_get(&(cr->ref));
1813 list_del(&(cr->timeout_list));
1814 cr->state = CONN_RETRANS_SENDING;
1816 spin_unlock_bh(&(nb->retrans_conn_lock));
1818 queuefull = _send_retrans(nb, cr, &bytes_sent);
1819 kref_put(&(cr->ref), free_connretrans);
1820 if (queuefull) {
1821 break;
1822 } else {
1823 *sent = 1;
1827 if (bytes_sent > 0)
1828 nbcongwin_data_retransmitted(nb, bytes_sent);
1830 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
1833 void retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
1835 struct neighbor *nb = container_of(retrans_conn_timer,
1836 struct neighbor, retrans_conn_timer);
1837 qos_enqueue(nb->queue, &(nb->rb_cr), ns_to_ktime(0),
1838 QOS_CALLER_CONN_RETRANS);
1839 kref_put(&(nb->ref), neighbor_free);
1842 static void conn_ack_ooo_rcvd_splitcr(struct conn *trgt_out_l,
1843 struct conn_retrans *cr, __u64 seqno_ooo, __u32 length,
1844 __u64 *bytes_acked)
1846 struct conn_retrans *cr2;
1847 __u64 seqno_cr2start;
1848 __u32 oldcrlenght = cr->length;
1850 if (cr->state != CONN_RETRANS_SCHEDULED &&
1851 cr->state != CONN_RETRANS_LOWWINDOW)
1852 return;
1854 seqno_cr2start = seqno_ooo+length;
1855 cr2 = prepare_conn_retrans(trgt_out_l, seqno_cr2start,
1856 seqno_clean(cr->seqno + cr->length - seqno_cr2start),
1857 cr->snd_delayed_lowbuf, cr, 1);
1859 if (unlikely(cr2 == 0))
1860 return;
1862 BUG_ON(cr2->length > cr->length);
1864 cr2->timeout = cr->timeout;
1865 cr2->state = cr->state;
1867 if (cr->state != CONN_RETRANS_SCHEDULED)
1868 list_add(&(cr2->timeout_list), &(cr->timeout_list));
1870 BUG_ON(seqno_clean(seqno_ooo - cr->seqno) > cr->length);
1872 cr->length -= seqno_clean(seqno_ooo - cr->seqno);
1873 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
1875 *bytes_acked += length;
1878 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
1879 struct conn *trgt_out, __u64 seqno_ooo, __u32 length,
1880 __u64 *bytes_acked)
1882 struct list_head *curr;
1884 if (unlikely(length == 0))
1885 return;
1887 spin_lock_bh(&(trgt_out->rcv_lock));
1889 if (unlikely(trgt_out->targettype != TARGET_OUT))
1890 goto out;
1891 if (unlikely(trgt_out->target.out.nb != nb))
1892 goto out;
1893 if (unlikely(trgt_out->target.out.conn_id != conn_id))
1894 goto out;
1896 kref_get(&(nb->ref));
1897 spin_lock_bh(&(nb->retrans_conn_lock));
1899 curr = trgt_out->target.out.retrans_list.next;
1901 while (curr != &(trgt_out->target.out.retrans_list)) {
1902 struct conn_retrans *cr = container_of(curr,
1903 struct conn_retrans, conn_list);
1905 int ack_covers_start = seqno_after_eq(cr->seqno, seqno_ooo);
1906 int ack_covers_end = seqno_before_eq(cr->seqno + cr->length,
1907 seqno_ooo + length);
1909 curr = curr->next;
1911 if (seqno_before(cr->seqno + cr->length, seqno_ooo))
1912 continue;
1914 if (seqno_after(cr->seqno, seqno_ooo + length))
1915 break;
1917 if (likely(ack_covers_start && ack_covers_end)) {
1918 cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
1919 reschedule_conn_retrans_timer(nb);
1920 } else if (ack_covers_start) {
1921 __u32 diff = seqno_ooo + length - cr->seqno -
1922 cr->length;
1923 BUG_ON(diff >= cr->length);
1924 cr->seqno += diff;
1925 cr->length -= diff;
1926 *bytes_acked =+ diff;
1927 } else if (ack_covers_end) {
1928 __u32 diff = seqno_ooo + length - cr->seqno;
1929 BUG_ON(diff >= length);
1930 cr->length -= diff;
1931 *bytes_acked += diff;
1932 } else {
1933 conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
1934 length, bytes_acked);
1935 break;
1939 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
1940 trgt_out->target.out.seqno_acked =
1941 trgt_out->target.out.seqno_nextsend;
1942 } else {
1943 struct conn_retrans *cr = container_of(
1944 trgt_out->target.out.retrans_list.next,
1945 struct conn_retrans, conn_list);
1946 if (seqno_after(cr->seqno, trgt_out->target.out.seqno_acked))
1947 trgt_out->target.out.seqno_acked = cr->seqno;
1950 spin_unlock_bh(&(nb->retrans_conn_lock));
1951 kref_put(&(nb->ref), neighbor_free);
1953 out:
1954 spin_unlock_bh(&(trgt_out->rcv_lock));
1957 static void _conn_ack_rcvd_nosendwin(struct conn *trgt_out_l)
1959 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
1960 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
1961 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
1963 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
1964 trgt_out_l->bufsize.act.noact.bytesleft = max(
1965 trgt_out_l->bufsize.act.noact.bytesleft,
1966 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1968 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
1969 trgt_out_l->bufsize.ignore_rcv_lowbuf,
1970 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1974 * nb->retrans_conn_lock must be held when calling this
1975 * (see schedule_retransmit_conn())
1977 static void reschedule_lowwindow_retrans(struct conn *trgt_out_l)
1979 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
1980 int cnt = 0;
1982 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
1983 struct conn_retrans *cr;
1985 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
1986 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
1987 65535);
1988 trgt_out_l->target.out.retrans_lowwindow = 0;
1989 break;
1992 cr = container_of(lh, struct conn_retrans, conn_list);
1994 if (seqno_after_eq(cr->seqno,
1995 trgt_out_l->target.out.seqno_windowlimit)) {
1996 break;
1999 if (cr->state == CONN_RETRANS_LOWWINDOW)
2000 schedule_retransmit_conn(cr, 1, 1);
2002 lh = lh->next;
2003 cnt++;
2007 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
2008 __u64 seqno, int setwindow, __u8 window, __u64 *bytes_acked)
2010 int seqno_advanced = 0;
2011 int window_enlarged = 0;
2013 spin_lock_bh(&(trgt_out->rcv_lock));
2015 if (unlikely(trgt_out->isreset != 0))
2016 goto out;
2017 if (unlikely(trgt_out->targettype != TARGET_OUT))
2018 goto out;
2019 if (unlikely(trgt_out->target.out.nb != nb))
2020 goto out;
2021 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
2022 goto out;
2024 if (unlikely(seqno_after(seqno, trgt_out->target.out.seqno_nextsend) ||
2025 seqno_before(seqno, trgt_out->target.out.seqno_acked)))
2026 goto out;
2028 if (setwindow) {
2029 __u64 windowdec = dec_log_64_7(window);
2030 if (likely(seqno_after(seqno,
2031 trgt_out->target.out.seqno_acked)) ||
2032 seqno_after(seqno + windowdec,
2033 trgt_out->target.out.seqno_windowlimit)) {
2034 trgt_out->target.out.seqno_windowlimit = seqno +
2035 windowdec;
2036 window_enlarged = 1;
2040 if (seqno_after(seqno, trgt_out->target.out.seqno_acked))
2041 seqno_advanced = 1;
2043 if (seqno_advanced == 0 && window_enlarged == 0)
2044 goto out;
2046 kref_get(&(nb->ref));
2047 spin_lock_bh(&(nb->retrans_conn_lock));
2049 if (seqno_advanced) {
2050 trgt_out->target.out.seqno_acked = seqno;
2051 cancel_acked_conn_retrans(trgt_out, bytes_acked);
2054 if (window_enlarged)
2055 reschedule_lowwindow_retrans(trgt_out);
2057 spin_unlock_bh(&(nb->retrans_conn_lock));
2058 kref_put(&(nb->ref), neighbor_free);
2060 if (seqno_advanced)
2061 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
2063 if (seqno_eq(trgt_out->target.out.seqno_acked,
2064 trgt_out->target.out.seqno_nextsend))
2065 _conn_ack_rcvd_nosendwin(trgt_out);
2067 out:
2068 if (seqno_advanced || window_enlarged)
2069 flush_buf(trgt_out);
2071 spin_unlock_bh(&(trgt_out->rcv_lock));
2073 wake_sender(trgt_out);
2076 static void try_combine_conn_retrans_prev(struct neighbor *nb_retransconnlocked,
2077 struct conn *trgt_out_lx, struct conn_retrans *cr)
2079 struct conn_retrans *cr_prev;
2080 __u64 bytes_dummyacked = 0;
2082 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
2084 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
2085 return;
2087 cr_prev = container_of(cr->conn_list.prev, struct conn_retrans,
2088 conn_list);
2090 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
2091 return;
2092 if (cr_prev->timeout != cr->timeout)
2093 return;
2094 if (!seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
2095 return;
2097 cr->seqno -= cr_prev->length;
2098 cr->length += cr_prev->length;
2100 cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
2101 &bytes_dummyacked);
2104 static void try_combine_conn_retrans_next(struct neighbor *nb_retranslocked,
2105 struct conn *trgt_out_lx, struct conn_retrans *cr)
2107 struct conn_retrans *cr_next;
2108 __u64 bytes_dummyacked = 0;
2110 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
2112 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
2113 return;
2115 cr_next = container_of(cr->conn_list.next, struct conn_retrans,
2116 conn_list);
2118 if (cr_next->state != CONN_RETRANS_SCHEDULED)
2119 return;
2120 if (cr_next->timeout != cr->timeout)
2121 return;
2122 if (!seqno_eq(cr->seqno + cr->length, cr_next->seqno))
2123 return;
2125 cr->length += cr_next->length;
2127 cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
2128 &bytes_dummyacked);
2131 void schedule_retransmit_conn(struct conn_retrans *cr, int connlocked,
2132 int nbretransconn_locked)
2134 struct conn *trgt_out_o = cr->trgt_out_o;
2135 struct neighbor *nb;
2136 int first;
2138 if (connlocked == 0)
2139 spin_lock_bh(&(trgt_out_o->rcv_lock));
2141 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
2142 nb = trgt_out_o->target.out.nb;
2144 cr->timeout = calc_timeout(atomic_read(&(nb->latency_retrans_us)),
2145 atomic_read(&(nb->latency_stddev_retrans_us)),
2146 atomic_read(&(nb->max_remote_ackconn_delay_us)));
2148 if (nbretransconn_locked == 0)
2149 spin_lock_bh(&(nb->retrans_conn_lock));
2151 kref_get(&(nb->ref));
2153 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
2155 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
2156 goto out;
2157 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
2158 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
2159 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
2160 trgt_out_o->target.out.retrans_lowwindow--;
2163 first = unlikely(list_empty(&(nb->retrans_conn_list)));
2164 list_add_tail(&(cr->timeout_list), &(nb->retrans_conn_list));
2165 cr->state = CONN_RETRANS_SCHEDULED;
2167 if (unlikely(first)) {
2168 reschedule_conn_retrans_timer(nb);
2169 } else {
2170 try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
2171 try_combine_conn_retrans_next(nb, trgt_out_o, cr);
2174 out:
2175 if (nbretransconn_locked == 0)
2176 spin_unlock_bh(&(nb->retrans_conn_lock));
2178 kref_put(&(nb->ref), neighbor_free);
2180 if (connlocked == 0)
2181 spin_unlock_bh(&(trgt_out_o->rcv_lock));
2184 static int _flush_out_skb(struct conn *trgt_out_lx, __u32 len,
2185 __u8 snd_delayed_lowbuf)
2187 struct neighbor *nb = trgt_out_lx->target.out.nb;
2189 __u64 seqno;
2190 struct conn_retrans *cr;
2191 struct sk_buff *skb;
2192 char *dst;
2193 __u8 flush;
2194 int rc;
2196 if (trgt_out_lx->flush != 0 &&
2197 trgt_out_lx->data_buf.read_remaining == len)
2198 flush = 1;
2200 seqno = trgt_out_lx->target.out.seqno_nextsend;
2201 skb = create_packet_conndata(trgt_out_lx->target.out.nb, len,
2202 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
2203 snd_delayed_lowbuf, flush);
2204 if (unlikely(skb == 0))
2205 return RC_FLUSH_CONN_OUT_OOM;
2207 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2208 0, 0);
2209 if (unlikely(cr == 0)) {
2210 kfree_skb(skb);
2211 return RC_FLUSH_CONN_OUT_OOM;
2214 dst = skb_put(skb, len);
2216 databuf_pull(trgt_out_lx, dst, len);
2218 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
2219 if (rc == NET_XMIT_DROP) {
2220 databuf_unpull(trgt_out_lx, len);
2221 spin_lock_bh(&(nb->retrans_conn_lock));
2222 cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
2223 spin_unlock_bh(&(nb->retrans_conn_lock));
2224 kref_put(&(cr->ref), free_connretrans);
2225 return RC_FLUSH_CONN_OUT_CONG;
2228 trgt_out_lx->target.out.seqno_nextsend += len;
2229 nbcongwin_data_sent(nb, len);
2230 schedule_retransmit_conn(cr, 1, 0);
2231 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2232 update_src_sock_sndspeed(trgt_out_lx, len);
2234 kref_put(&(cr->ref), free_connretrans);
2236 return (rc == NET_XMIT_SUCCESS) ?
2237 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
2240 static int _flush_out_conndata(struct conn *trgt_out_lx, __u32 len,
2241 __u8 snd_delayed_lowbuf)
2243 __u64 seqno;
2244 struct control_msg_out *cm;
2245 struct conn_retrans *cr;
2246 char *buf;
2247 __u8 flush = 0;
2249 if (trgt_out_lx->flush != 0 &&
2250 trgt_out_lx->data_buf.read_remaining == len)
2251 flush = 1;
2253 buf = kmalloc(len, GFP_ATOMIC);
2255 if (unlikely(buf == 0))
2256 return RC_FLUSH_CONN_OUT_OOM;
2258 cm = alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
2259 if (unlikely(cm == 0)) {
2260 kfree(buf);
2261 return RC_FLUSH_CONN_OUT_OOM;
2264 seqno = trgt_out_lx->target.out.seqno_nextsend;
2266 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2267 0, 0);
2268 if (unlikely(cr == 0)) {
2269 kfree(buf);
2270 free_control_msg(cm);
2271 return RC_FLUSH_CONN_OUT_OOM;
2274 databuf_pull(trgt_out_lx, buf, len);
2275 trgt_out_lx->target.out.seqno_nextsend += len;
2276 nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
2277 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2278 update_src_sock_sndspeed(trgt_out_lx, len);
2280 send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf, len,
2281 snd_delayed_lowbuf, flush, trgt_out_lx->is_highlatency,
2282 cr);
2284 return RC_FLUSH_CONN_OUT_OK;
2287 int srcin_buflimit_reached(struct conn *src_in_lx)
2289 __u64 window_left;
2291 if (unlikely(seqno_before(src_in_lx->source.in.window_seqnolimit,
2292 src_in_lx->source.in.next_seqno)))
2293 return 1;
2295 window_left = seqno_clean(src_in_lx->source.in.window_seqnolimit -
2296 src_in_lx->source.in.next_seqno);
2298 if (window_left < WINDOW_ENCODE_MIN)
2299 return 1;
2301 if (window_left/2 < src_in_lx->data_buf.read_remaining)
2302 return 1;
2304 return 0;
2307 static __u32 maxsend_left_to_len(__u32 maxsend_left)
2309 __u32 i;
2310 if (maxsend_left < 128)
2311 return maxsend_left;
2313 for (i=128;i<4096;) {
2314 if (i*2 > maxsend_left)
2315 return i;
2316 i = i*2;
2319 return maxsend_left - maxsend_left%4096;
2322 static int seqno_low_sendlimit(struct conn *trgt_out_lx, __u64 windowlimit,
2323 __u32 sndlen)
2325 __u64 bytes_ackpending;
2327 BUG_ON(seqno_before(trgt_out_lx->target.out.seqno_nextsend,
2328 trgt_out_lx->target.out.seqno_acked));
2330 bytes_ackpending = seqno_clean(trgt_out_lx->target.out.seqno_nextsend -
2331 trgt_out_lx->target.out.seqno_acked);
2333 if (windowlimit <= sndlen)
2334 return 1;
2336 if (unlikely(bytes_ackpending + sndlen < bytes_ackpending))
2337 return 0;
2339 if (trgt_out_lx->is_highlatency != 0)
2340 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 4)
2341 ? 1 : 0;
2342 else
2343 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 8)
2344 ? 1 : 0;
2347 static void _flush_out_ignore_lowbuf(struct conn *trgt_out_lx)
2349 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
2350 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
2351 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
2354 static __u64 get_windowlimit(struct conn *trgt_out_lx)
2356 if (unlikely(seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
2357 trgt_out_lx->target.out.seqno_nextsend)))
2358 return 0;
2360 return seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
2361 trgt_out_lx->target.out.seqno_nextsend);
2364 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
2365 int from_qos, int maxsend_forcedelay)
2367 struct neighbor *nb = trgt_out_lx->target.out.nb;
2369 __u32 targetmss;
2371 int nbstate;
2373 __u8 snd_delayed_lowbuf = trgt_out_lx->target.out.windowlimit_reached;
2375 __u32 maxsend_left = maxsend;
2377 trgt_out_lx->target.out.windowlimit_reached = 0;
2379 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
2381 if (unlikely(trgt_out_lx->target.out.established == 0))
2382 return RC_FLUSH_CONN_OUT_OK;
2384 if (unlikely(trgt_out_lx->isreset != 0))
2385 return RC_FLUSH_CONN_OUT_OK;
2387 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
2389 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
2390 return RC_FLUSH_CONN_OUT_OK;
2392 if (from_qos == 0 && qos_fastsend_allowed_conn(trgt_out_lx) == 0)
2393 return RC_FLUSH_CONN_OUT_CONG;
2395 get_conn_idletime(trgt_out_lx);
2397 spin_lock_bh(&(nb->stalledconn_lock));
2398 nbstate = get_neigh_state(nb);
2399 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
2400 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev == 0 &&
2401 trgt_out_lx->target.out.nbstalled_lh.next != 0);
2402 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev != 0 &&
2403 trgt_out_lx->target.out.nbstalled_lh.next == 0);
2405 if (trgt_out_lx->target.out.nbstalled_lh.prev == 0) {
2406 kref_get(&(trgt_out_lx->ref));
2407 list_add_tail(&(trgt_out_lx->target.out.nbstalled_lh),
2408 &(nb->stalledconn_list));
2411 spin_unlock_bh(&(nb->stalledconn_lock));
2413 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
2414 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
2416 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
2417 get_windowlimit(trgt_out_l),
2418 trgt_out_l->data_buf.read_remaining); */
2420 targetmss = mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
2422 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
2423 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2424 int rc;
2426 if (maxsend_left < targetmss)
2427 break;
2429 if (windowlimit < targetmss) {
2430 trgt_out_lx->target.out.windowlimit_reached = 1;
2431 snd_delayed_lowbuf = 1;
2432 _flush_out_ignore_lowbuf(trgt_out_lx);
2433 break;
2436 if (nbcongwin_send_allowed(nb) == 0)
2437 return RC_FLUSH_CONN_OUT_CONG;
2439 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, targetmss)) {
2440 trgt_out_lx->target.out.windowlimit_reached = 1;
2441 snd_delayed_lowbuf = 1;
2444 if (likely(send_conndata_as_skb(nb, targetmss)))
2445 rc = _flush_out_skb(trgt_out_lx, targetmss,
2446 snd_delayed_lowbuf);
2447 else
2448 rc = _flush_out_conndata(trgt_out_lx, targetmss,
2449 snd_delayed_lowbuf);
2451 if (rc == RC_FLUSH_CONN_OUT_OK ||
2452 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2453 maxsend_left -= targetmss;
2454 *sent += targetmss;
2457 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2458 return RC_FLUSH_CONN_OUT_CONG;
2459 if (rc != RC_FLUSH_CONN_OUT_OK)
2460 return rc;
2463 if (trgt_out_lx->data_buf.read_remaining > 0) {
2464 __u32 len = trgt_out_lx->data_buf.read_remaining;
2465 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2466 int rc;
2468 if (maxsend_left < len) {
2469 if (maxsend_left >= 65536 || (
2470 maxsend_left == maxsend &&
2471 maxsend_left >= 128 &&
2472 trgt_out_lx->is_highlatency == 0 &&
2473 !maxsend_forcedelay)) {
2474 len = maxsend_left_to_len(maxsend_left);
2475 } else {
2476 return RC_FLUSH_CONN_OUT_MAXSENT;
2480 if (trgt_out_lx->flush == 0 &&
2481 trgt_out_lx->sourcetype == SOURCE_SOCK &&
2482 cor_sock_sndbufavailable(trgt_out_lx) != 0)
2483 goto out;
2485 if (trgt_out_lx->flush == 0 &&
2486 trgt_out_lx->sourcetype == SOURCE_IN &&
2487 srcin_buflimit_reached(trgt_out_lx)
2488 == 0 && (
2489 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2490 trgt_out_lx->target.out.seqno_acked) == 0 ||
2491 trgt_out_lx->is_highlatency != 0))
2492 goto out;
2494 if (trgt_out_lx->flush == 0 &&
2495 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
2496 cpacket_write_allowed(trgt_out_lx) != 0)
2497 goto out;
2499 if (windowlimit == 0 || (windowlimit < len &&
2500 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2501 trgt_out_lx->target.out.seqno_acked) == 0)) {
2502 trgt_out_lx->target.out.windowlimit_reached = 1;
2503 snd_delayed_lowbuf = 1;
2504 _flush_out_ignore_lowbuf(trgt_out_lx);
2505 goto out;
2508 if (nbcongwin_send_allowed(nb) == 0)
2509 return RC_FLUSH_CONN_OUT_CONG;
2511 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, len)) {
2512 trgt_out_lx->target.out.windowlimit_reached = 1;
2513 snd_delayed_lowbuf = 1;
2516 if (len > windowlimit) {
2517 len = windowlimit;
2518 _flush_out_ignore_lowbuf(trgt_out_lx);
2521 if (send_conndata_as_skb(nb, len))
2522 rc = _flush_out_skb(trgt_out_lx, len,
2523 snd_delayed_lowbuf);
2524 else
2525 rc = _flush_out_conndata(trgt_out_lx, len,
2526 snd_delayed_lowbuf);
2529 if (rc == RC_FLUSH_CONN_OUT_OK ||
2530 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2531 maxsend_left -= len;
2532 *sent += len;
2535 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2536 return RC_FLUSH_CONN_OUT_CONG;
2537 if (rc != RC_FLUSH_CONN_OUT_OK)
2538 return rc;
2541 out:
2542 return RC_FLUSH_CONN_OUT_OK;
2545 int flush_out(struct conn *trgt_out_lx, __u32 *sent)
2547 int rc = _flush_out(trgt_out_lx, 1 << 30, sent, 0, 0);
2549 if (rc == RC_FLUSH_CONN_OUT_CONG || rc == RC_FLUSH_CONN_OUT_MAXSENT ||
2550 rc == RC_FLUSH_CONN_OUT_OOM)
2551 qos_enqueue_conn(trgt_out_lx);
2553 return rc;
2556 void resume_nbstalled_conns(struct work_struct *work)
2558 struct neighbor *nb = container_of(work, struct neighbor,
2559 stalledconn_work);
2560 int rc = RC_FLUSH_CONN_OUT_OK;
2562 spin_lock_bh(&(nb->stalledconn_lock));
2563 nb->stalledconn_work_scheduled = 0;
2564 while (rc != RC_FLUSH_CONN_OUT_NBNOTACTIVE &&
2565 list_empty(&(nb->stalledconn_list)) == 0) {
2566 struct list_head *lh = nb->stalledconn_list.next;
2567 struct conn *trgt_out = container_of(lh, struct conn,
2568 target.out.nbstalled_lh);
2569 __u32 sent = 0;
2570 BUG_ON(trgt_out->targettype != TARGET_OUT);
2571 list_del(lh);
2572 lh->prev = 0;
2573 lh->next = 0;
2575 spin_unlock_bh(&(nb->stalledconn_lock));
2577 spin_lock_bh(&(trgt_out->rcv_lock));
2578 if (likely(trgt_out->targettype == TARGET_OUT))
2579 rc = flush_out(trgt_out, &sent);
2580 spin_unlock_bh(&(trgt_out->rcv_lock));
2582 if (sent != 0)
2583 wake_sender(trgt_out);
2585 kref_put(&(trgt_out->ref), free_conn);
2587 spin_lock_bh(&(nb->stalledconn_lock));
2589 spin_unlock_bh(&(nb->stalledconn_lock));
2591 kref_put(&(nb->ref), neighbor_free);
2594 int __init cor_snd_init(void)
2596 connretrans_slab = kmem_cache_create("cor_connretrans",
2597 sizeof(struct conn_retrans), 8, 0, 0);
2598 if (unlikely(connretrans_slab == 0))
2599 return -ENOMEM;
2601 return 0;
2604 void __exit cor_snd_exit1(void)
2606 flush_work(&qos_waitexit_work);
2609 void __exit cor_snd_exit2(void)
2611 kmem_cache_destroy(connretrans_slab);
2612 connretrans_slab = 0;
2615 MODULE_LICENSE("GPL");