measure ping latency including qos_resume delay
[cor.git] / net / cor / snd.c
blob487d5183b3d0bf6a68e405245c5883673f591389
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2020 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/gfp.h>
22 #include <linux/jiffies.h>
23 #include <linux/slab.h>
25 #include "cor.h"
27 static struct kmem_cache *connretrans_slab;
29 static DEFINE_SPINLOCK(queues_lock);
30 static LIST_HEAD(queues);
32 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
33 int from_qos, int maxsend_forcedelay);
35 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
36 ktime_t cmsg_send_start, int caller, int from_nbcongwin_resume);
39 #ifdef DEBUG_QOS_SLOWSEND
40 static DEFINE_SPINLOCK(slowsend_lock);
41 static unsigned long last_send;
44 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
46 int allowsend = 0;
47 unsigned long jiffies_tmp;
48 spin_lock_bh(&slowsend_lock);
49 jiffies_tmp = jiffies;
50 if (last_send != jiffies_tmp) {
51 if (last_send + 1 == jiffies_tmp) {
52 last_send = jiffies_tmp;
53 } else {
54 last_send = jiffies_tmp - 1;
56 allowsend = 1;
58 spin_unlock_bh(&slowsend_lock);
60 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d", caller, allowsend); */
61 if (allowsend) {
62 return dev_queue_xmit(skb);
63 } else {
64 kfree_skb(skb);
65 return NET_XMIT_DROP;
68 #endif
70 static void free_connretrans(struct kref *ref)
72 struct conn_retrans *cr = container_of(ref, struct conn_retrans, ref);
73 struct conn *cn = cr->trgt_out_o;
75 BUG_ON(cr->state != CONN_RETRANS_ACKED);
77 kmem_cache_free(connretrans_slab, cr);
78 kref_put(&(cn->ref), free_conn);
81 void free_qos(struct kref *ref)
83 struct qos_queue *q = container_of(ref, struct qos_queue, ref);
84 kfree(q);
88 static void qos_queue_set_congstatus(struct qos_queue *q_locked);
90 /**
91 * neighbor congestion window:
92 * increment by 4096 every round trip if more that 2/3 of cwin is used
94 * in case of packet loss decrease by 1/4:
95 * - <= 1/8 immediately and
96 * - <= 1/4 during the next round trip
98 * in case of multiple packet loss events, do not decrement more than once per
99 * round trip
102 #ifdef COR_NBCONGWIN
104 /*extern __u64 get_bufspace_used(void);
106 static void print_conn_bufstats(struct neighbor *nb)
108 / * not threadsafe, but this is only for debugging... * /
109 __u64 totalsize = 0;
110 __u64 read_remaining = 0;
111 __u32 numconns = 0;
112 struct list_head *lh;
113 unsigned long iflags;
115 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
117 lh = nb->conns_waiting.lh.next;
118 while (lh != &(nb->conns_waiting.lh)) {
119 struct conn *cn = container_of(lh, struct conn,
120 target.out.rb.lh);
121 totalsize += cn->data_buf.datasize;
122 read_remaining += cn->data_buf.read_remaining;
123 lh = lh->next;
126 lh = nb->conns_waiting.lh_nextpass.next;
127 while (lh != &(nb->conns_waiting.lh_nextpass)) {
128 struct conn *cn = container_of(lh, struct conn,
129 target.out.rb.lh);
130 totalsize += cn->data_buf.datasize;
131 read_remaining += cn->data_buf.read_remaining;
132 lh = lh->next;
135 numconns = nb->conns_waiting.cnt;
137 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
139 printk(KERN_ERR "conn %llu %llu %u", totalsize, read_remaining, numconns);
140 } */
142 static void nbcongwin_data_retransmitted(struct neighbor *nb, __u64 bytes_sent)
144 __u64 min_cwin = mss_conndata(nb, 0)*2 << NBCONGWIN_SHIFT;
145 __u64 cwin;
147 unsigned long iflags;
149 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
151 cwin = atomic64_read(&(nb->nbcongwin.cwin));
153 /* printk(KERN_ERR "retrans %llu %llu", cwin >> NBCONGWIN_SHIFT,
154 get_bufspace_used());
155 print_conn_bufstats(nb); */
157 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
158 BUG_ON(cwin >= U64_MAX/1024);
160 if (bytes_sent > 1024)
161 bytes_sent = 1024;
163 if (nb->nbcongwin.cwin_shrinkto == cwin) {
164 if (bytes_sent > 512) {
165 cwin -= cwin/16;
166 } else {
167 cwin -= (bytes_sent * cwin) / (1024 * 8);
169 if (cwin < min_cwin)
170 cwin = min_cwin;
171 atomic64_set(&(nb->nbcongwin.cwin), cwin);
174 nb->nbcongwin.cwin_shrinkto -=
175 (bytes_sent * nb->nbcongwin.cwin_shrinkto) / (1024 * 8);
177 nb->nbcongwin.cwin_shrinkto = max(nb->nbcongwin.cwin_shrinkto,
178 cwin - cwin/8);
180 if (nb->nbcongwin.cwin_shrinkto < min_cwin)
181 nb->nbcongwin.cwin_shrinkto = min_cwin;
183 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
186 static __u64 nbcongwin_update_cwin(struct neighbor *nb_cwlocked,
187 __u64 data_intransit, __u64 bytes_acked)
189 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
190 __u32 INCR_PER_RTT = 4096;
192 __u64 cwin = atomic64_read(&(nb_cwlocked->nbcongwin.cwin));
194 __u64 cwin_tmp;
195 __u64 incrby;
197 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
198 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT-2));
199 if (unlikely(shrinkby > cwin))
200 cwin = 0;
201 else
202 cwin -= shrinkby;
204 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
205 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
209 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
210 goto out;
212 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
214 if (unlikely(bytes_acked >= U64_MAX/INCR_PER_RTT/CWIN_MUL))
215 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
216 cwin_tmp / CWIN_MUL / CWIN_MUL);
217 else if (unlikely(bytes_acked >=
218 U64_MAX/INCR_PER_RTT/CWIN_MUL/CWIN_MUL))
219 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
220 cwin_tmp / CWIN_MUL);
221 else
222 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
223 CWIN_MUL, cwin_tmp);
225 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
227 if (unlikely(cwin + incrby < cwin))
228 cwin = U64_MAX;
229 else
230 cwin += incrby;
232 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
233 nb_cwlocked->nbcongwin.cwin_shrinkto))
234 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
235 else
236 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
238 out:
239 atomic64_set(&(nb_cwlocked->nbcongwin.cwin), cwin);
241 return cwin;
244 void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
246 unsigned long iflags;
247 struct qos_queue *q = nb->queue;
248 __u64 data_intransit;
249 __u64 cwin;
251 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
253 data_intransit = atomic64_read(&(nb->nbcongwin.data_intransit));
255 cwin = nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
257 BUG_ON(bytes_acked > data_intransit);
258 atomic64_sub(bytes_acked, &(nb->nbcongwin.data_intransit));
259 data_intransit -= bytes_acked;
261 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
262 goto out_sendnok;
264 spin_lock(&(q->qlock));
265 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
266 if (nb->conns_waiting.cnt == 0) {
267 nb->rb.in_queue = RB_INQUEUE_FALSE;
268 } else {
269 _qos_enqueue(q, &(nb->rb), ns_to_ktime(0),
270 QOS_CALLER_NEIGHBOR, 1);
273 spin_unlock(&(q->qlock));
276 out_sendnok:
277 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
280 static void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
282 atomic64_add(bytes_sent, &(nb->nbcongwin.data_intransit));
285 static int nbcongwin_send_allowed(struct neighbor *nb)
287 unsigned long iflags;
288 int ret = 1;
289 struct qos_queue *q = nb->queue;
290 int krefput_queue = 0;
292 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
293 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
294 return 1;
296 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
298 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
299 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
300 goto out_ok;
302 ret = 0;
304 spin_lock(&(q->qlock));
305 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
306 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
307 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
308 list_del(&(nb->rb.lh));
309 kref_put(&(nb->ref), kreffree_bug);
310 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
311 BUG_ON(q->numconns < nb->conns_waiting.cnt);
312 q->numconns -= nb->conns_waiting.cnt;
313 q->priority_sum -= nb->conns_waiting.priority_sum;
314 krefput_queue = 1;
316 qos_queue_set_congstatus(q);
317 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
318 } else {
319 BUG();
321 spin_unlock(&(q->qlock));
323 if (krefput_queue != 0)
324 kref_put(&(q->ref), free_qos);
326 out_ok:
327 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
329 return ret;
332 #else
334 static inline void nbcongwin_data_retransmitted(struct neighbor *nb,
335 __u64 bytes_sent)
339 static inline void nbcongwin_data_acked(struct neighbor *nb, __u64 bytes_acked)
343 static inline void nbcongwin_data_sent(struct neighbor *nb, __u32 bytes_sent)
347 static inline int nbcongwin_send_allowed(struct neighbor *nb)
349 return 1;
352 #endif
354 static __u64 _resume_conns_maxsend(struct qos_queue *q, struct conn *trgt_out_l,
355 __u32 newpriority, int *maxsend_forcedelay)
357 unsigned long iflags;
359 struct neighbor *nb = trgt_out_l->target.out.nb;
360 __u32 oldpriority = trgt_out_l->target.out.rb_priority;
361 __u64 priority_sum;
362 __u32 numconns;
364 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
365 spin_lock(&(q->qlock));
367 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
368 BUG_ON(q->priority_sum < oldpriority);
369 nb->conns_waiting.priority_sum -= oldpriority;
370 q->priority_sum -= oldpriority;
372 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
373 nb->conns_waiting.priority_sum);
374 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
375 nb->conns_waiting.priority_sum += newpriority;
376 q->priority_sum += newpriority;
378 priority_sum = q->priority_sum;
379 numconns = q->numconns;
381 spin_unlock(&(q->qlock));
382 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
384 trgt_out_l->target.out.rb_priority = newpriority;
386 if (numconns <= 4) {
387 *maxsend_forcedelay = 1;
388 return div_u64(2048LL * ((__u64) newpriority) *
389 ((__u64) numconns), priority_sum);
390 } else {
391 *maxsend_forcedelay = 0;
392 return div_u64(1024LL * ((__u64) newpriority) *
393 ((__u64) numconns), priority_sum);
397 static int _resume_neighbors_nextpass(struct neighbor *nb_waitingconnslocked)
399 BUG_ON(list_empty(&(nb_waitingconnslocked->conns_waiting.lh)) == 0);
401 if (list_empty(&(nb_waitingconnslocked->conns_waiting.lh_nextpass))) {
402 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
403 return 1;
406 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
408 nb_waitingconnslocked->conns_waiting.lh.next =
409 nb_waitingconnslocked->conns_waiting.lh_nextpass.next;
410 nb_waitingconnslocked->conns_waiting.lh.prev =
411 nb_waitingconnslocked->conns_waiting.lh_nextpass.prev;
412 nb_waitingconnslocked->conns_waiting.lh.next->prev =
413 &(nb_waitingconnslocked->conns_waiting.lh);
414 nb_waitingconnslocked->conns_waiting.lh.prev->next =
415 &(nb_waitingconnslocked->conns_waiting.lh);
416 nb_waitingconnslocked->conns_waiting.lh_nextpass.next =
417 &(nb_waitingconnslocked->conns_waiting.lh_nextpass);
418 nb_waitingconnslocked->conns_waiting.lh_nextpass.prev =
419 &(nb_waitingconnslocked->conns_waiting.lh_nextpass);
421 return 0;
424 static int _resume_neighbors(struct qos_queue *q, struct neighbor *nb,
425 int *progress)
427 unsigned long iflags;
429 while (1) {
430 __u32 priority;
431 __u32 maxsend;
432 int maxsend_forcedelay;
434 int rc2;
435 __u32 sent2 = 0;
437 struct conn *cn = 0;
438 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
439 if (list_empty(&(nb->conns_waiting.lh)) != 0) {
440 int done = _resume_neighbors_nextpass(nb);
441 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
442 iflags);
443 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
445 BUG_ON(nb->conns_waiting.cnt == 0);
447 cn = container_of(nb->conns_waiting.lh.next, struct conn,
448 target.out.rb.lh);
449 BUG_ON(cn->targettype != TARGET_OUT);
450 BUG_ON(cn->target.out.rb.lh.prev != &(nb->conns_waiting.lh));
451 BUG_ON((cn->target.out.rb.lh.next == &(nb->conns_waiting.lh)) &&
452 (nb->conns_waiting.lh.prev !=
453 &(cn->target.out.rb.lh)));
454 list_del(&(cn->target.out.rb.lh));
455 list_add_tail(&(cn->target.out.rb.lh),
456 &(nb->conns_waiting.lh_nextpass));
457 kref_get(&(cn->ref));
458 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
461 priority = refresh_conn_priority(cn, 0);
463 spin_lock_bh(&(cn->rcv_lock));
465 if (unlikely(cn->targettype != TARGET_OUT)) {
466 spin_unlock_bh(&(cn->rcv_lock));
467 continue;
470 maxsend = _resume_conns_maxsend(q, cn, priority,
471 &maxsend_forcedelay);
472 if (cn->target.out.maxsend_extra >= maxsend)
473 maxsend_forcedelay = 0;
474 maxsend += cn->target.out.maxsend_extra;
475 if (unlikely(maxsend > U32_MAX))
476 maxsend = U32_MAX;
477 if (unlikely(maxsend >= 65536))
478 maxsend_forcedelay = 0;
480 rc2 = _flush_out(cn, maxsend, &sent2, 1, maxsend_forcedelay);
482 if (rc2 == RC_FLUSH_CONN_OUT_OK ||
483 rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE) {
484 cn->target.out.maxsend_extra = 0;
485 qos_remove_conn(cn);
486 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
487 rc2 == RC_FLUSH_CONN_OUT_OOM)) {
488 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
489 if (likely(cn->target.out.rb.in_queue !=
490 RB_INQUEUE_FALSE)) {
491 list_del(&(cn->target.out.rb.lh));
492 list_add(&(cn->target.out.rb.lh),
493 &(nb->conns_waiting.lh));
495 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
496 iflags);
497 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
498 rc2 == RC_FLUSH_CONN_OUT_OOM) {
499 cn->target.out.maxsend_extra = 0;
500 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
501 if (unlikely(maxsend - sent2 > 65535))
502 cn->target.out.maxsend_extra = 65535;
503 else
504 cn->target.out.maxsend_extra = maxsend - sent2;
507 spin_unlock_bh(&(cn->rcv_lock));
509 if (sent2 != 0) {
510 *progress = 1;
511 wake_sender(cn);
514 kref_put(&(cn->ref), free_conn);
516 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
517 rc2 == RC_FLUSH_CONN_OUT_OOM) {
518 return QOS_RESUME_CONG;
523 static int resume_neighbors(struct qos_queue *q, int *sent)
525 unsigned long iflags;
527 spin_lock_irqsave(&(q->qlock), iflags);
529 while (1) {
530 struct neighbor *nb;
531 int rc;
533 if (list_empty(&(q->neighbors_waiting)) != 0) {
534 BUG_ON(q->numconns != 0);
535 spin_unlock_irqrestore(&(q->qlock), iflags);
536 return QOS_RESUME_DONE;
538 BUG_ON(q->numconns == 0);
540 nb = container_of(q->neighbors_waiting.next, struct neighbor,
541 rb.lh);
543 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
544 BUG_ON(nb->rb.lh.prev != &(q->neighbors_waiting));
545 BUG_ON((nb->rb.lh.next == &(q->neighbors_waiting)) &&
546 (q->neighbors_waiting.prev != &(nb->rb.lh)));
548 kref_get(&(nb->ref));
550 spin_unlock_irqrestore(&(q->qlock), iflags);
552 atomic_set(&(nb->cmsg_delay_conndata), 1);
554 rc = _resume_neighbors(q, nb, sent);
555 if (rc == QOS_RESUME_CONG) {
556 kref_put(&(nb->ref), neighbor_free);
557 return QOS_RESUME_CONG;
560 atomic_set(&(nb->cmsg_delay_conndata), 0);
561 spin_lock_bh(&(nb->cmsg_lock));
562 schedule_controlmsg_timer(nb);
563 spin_unlock_bh(&(nb->cmsg_lock));
565 spin_lock_irqsave(&(q->qlock), iflags);
566 if (rc == QOS_RESUME_DONE) {
567 if (nb->conns_waiting.cnt == 0 &&
568 nb->rb.in_queue == RB_INQUEUE_TRUE) {
569 nb->rb.in_queue = RB_INQUEUE_FALSE;
570 list_del(&(nb->rb.lh));
571 kref_put(&(nb->ref), kreffree_bug);
573 } else if (rc == QOS_RESUME_NEXTNEIGHBOR) {
574 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
575 list_del(&(nb->rb.lh));
576 list_add_tail(&(nb->rb.lh),
577 &(q->neighbors_waiting));
579 } else {
580 BUG();
583 kref_put(&(nb->ref), neighbor_free);
585 if (rc == QOS_RESUME_NEXTNEIGHBOR) {
586 spin_unlock_irqrestore(&(q->qlock), iflags);
587 return QOS_RESUME_NEXTNEIGHBOR;
592 static int send_retrans(struct neighbor *nb, int *sent);
594 static int _qos_resume(struct qos_queue *q, int caller, int *sent)
596 unsigned long iflags;
597 int rc = QOS_RESUME_DONE;
598 struct list_head *lh;
600 spin_lock_irqsave(&(q->qlock), iflags);
602 if (caller == QOS_CALLER_KPACKET)
603 lh = &(q->kpackets_waiting);
604 else if (caller == QOS_CALLER_CONN_RETRANS)
605 lh = &(q->conn_retrans_waiting);
606 else if (caller == QOS_CALLER_ANNOUNCE)
607 lh = &(q->announce_waiting);
608 else
609 BUG();
611 while (list_empty(lh) == 0) {
612 struct resume_block *rb = container_of(lh->next,
613 struct resume_block, lh);
614 ktime_t cmsg_send_start;
615 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
616 rb->in_queue = RB_INQUEUE_FALSE;
617 list_del(&(rb->lh));
619 if (caller == QOS_CALLER_KPACKET)
620 cmsg_send_start = container_of(rb, struct neighbor,
621 rb_kp)->cmsg_send_start;
623 spin_unlock_irqrestore(&(q->qlock), iflags);
624 if (caller == QOS_CALLER_KPACKET) {
625 rc = send_messages(container_of(rb, struct neighbor,
626 rb_kp), cmsg_send_start, sent);
627 } else if (caller == QOS_CALLER_CONN_RETRANS) {
628 rc = send_retrans(container_of(rb, struct neighbor,
629 rb_cr), sent);
630 } else if (caller == QOS_CALLER_ANNOUNCE) {
631 rc = _send_announce(container_of(rb,
632 struct announce_data, rb), 1, sent);
633 } else {
634 BUG();
636 spin_lock_irqsave(&(q->qlock), iflags);
638 if (rc != QOS_RESUME_DONE && caller == QOS_CALLER_KPACKET)
639 container_of(rb, struct neighbor, rb_kp
640 )->cmsg_send_start = cmsg_send_start;
642 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
643 rb->in_queue = RB_INQUEUE_TRUE;
644 list_add(&(rb->lh), lh);
645 break;
648 if (caller == QOS_CALLER_KPACKET) {
649 kref_put(&(container_of(rb, struct neighbor,
650 rb_kp)->ref), neighbor_free);
651 } else if (caller == QOS_CALLER_CONN_RETRANS) {
652 kref_put(&(container_of(rb, struct neighbor,
653 rb_cr)->ref), neighbor_free);
654 } else if (caller == QOS_CALLER_ANNOUNCE) {
655 kref_put(&(container_of(rb,
656 struct announce_data, rb)->ref),
657 announce_data_free);
658 } else {
659 BUG();
662 kref_put(&(q->ref), kreffree_bug);
665 spin_unlock_irqrestore(&(q->qlock), iflags);
667 return rc;
670 void qos_resume_taskfunc(unsigned long arg)
672 struct qos_queue *q = (struct qos_queue *) arg;
674 int rc;
675 int sent = 0;
676 unsigned long iflags;
677 int i = 0;
679 #warning todo limit runtime of resume task
681 spin_lock_irqsave(&(q->qlock), iflags);
683 while (i<4) {
684 struct list_head *lh;
686 rc = QOS_RESUME_DONE;
688 if (i == QOS_CALLER_KPACKET)
689 lh = &(q->kpackets_waiting);
690 else if (i == QOS_CALLER_CONN_RETRANS)
691 lh = &(q->conn_retrans_waiting);
692 else if (i == QOS_CALLER_ANNOUNCE)
693 lh = &(q->announce_waiting);
694 else if (i == QOS_CALLER_NEIGHBOR)
695 lh = &(q->neighbors_waiting);
696 else
697 BUG();
699 if (list_empty(lh)) {
700 i++;
701 continue;
704 spin_unlock_irqrestore(&(q->qlock), iflags);
705 if (i == QOS_CALLER_NEIGHBOR) {
706 rc = resume_neighbors(q, &sent);
707 } else {
708 rc = _qos_resume(q, i, &sent);
711 spin_lock_irqsave(&(q->qlock), iflags);
713 i = 0;
715 if (rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR)
716 break;
719 if (rc == QOS_RESUME_DONE) {
720 BUG_ON(!list_empty(&(q->kpackets_waiting)));
721 BUG_ON(!list_empty(&(q->conn_retrans_waiting)));
722 BUG_ON(!list_empty(&(q->announce_waiting)));
723 BUG_ON(!list_empty(&(q->neighbors_waiting)));
725 q->qos_resume_scheduled = 0;
726 } else {
727 unsigned long jiffies_tmp = jiffies;
728 unsigned long delay = (jiffies_tmp - q->jiffies_lastprogress +
729 3) / 4;
731 if (sent || unlikely(delay <= 0)) {
732 q->jiffies_lastprogress = jiffies_tmp;
733 delay = 1;
734 } else if (delay > HZ/10) {
735 q->jiffies_lastprogress = jiffies_tmp - (HZ*4)/10;
736 delay = HZ/10;
739 /* If we retry too fast here, we might starve layer 2 */
740 if (mod_timer(&(q->qos_resume_timer), jiffies_tmp + delay) ==
741 0) {
742 kref_get(&(q->ref));
746 qos_queue_set_congstatus(q);
748 spin_unlock_irqrestore(&(q->qlock), iflags);
751 static inline int qos_queue_is_destroyed(struct qos_queue *q_locked)
753 return q_locked->dev == 0;
756 #warning todo kref (kref_put if tasklet is scheduled)
757 void qos_resume_timerfunc(struct timer_list *qos_resume_timer)
759 unsigned long iflags;
760 struct qos_queue *q = container_of(qos_resume_timer,
761 struct qos_queue, qos_resume_timer);
762 spin_lock_irqsave(&(q->qlock), iflags);
763 if (likely(!qos_queue_is_destroyed(q)))
764 tasklet_schedule(&(q->qos_resume_task));
765 spin_unlock_irqrestore(&(q->qlock), iflags);
767 kref_put(&(q->ref), free_qos);
770 struct qos_queue *get_queue(struct net_device *dev)
772 struct qos_queue *ret = 0;
773 struct list_head *curr;
775 spin_lock_bh(&(queues_lock));
776 curr = queues.next;
777 while (curr != (&queues)) {
778 struct qos_queue *q = container_of(curr,
779 struct qos_queue, queue_list);
780 if (q->dev == dev) {
781 ret = q;
782 kref_get(&(ret->ref));
783 break;
785 curr = curr->next;
787 spin_unlock_bh(&(queues_lock));
788 return ret;
791 static void _destroy_queue(struct qos_queue *q, int caller)
793 struct list_head *lh;
795 if (caller == QOS_CALLER_KPACKET)
796 lh = &(q->kpackets_waiting);
797 else if (caller == QOS_CALLER_CONN_RETRANS)
798 lh = &(q->conn_retrans_waiting);
799 else if (caller == QOS_CALLER_ANNOUNCE)
800 lh = &(q->announce_waiting);
801 else if (caller == QOS_CALLER_NEIGHBOR)
802 lh = &(q->neighbors_waiting);
803 else
804 BUG();
806 while (list_empty(lh) == 0) {
807 struct list_head *curr = lh->next;
808 struct resume_block *rb = container_of(curr,
809 struct resume_block, lh);
810 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
811 rb->in_queue = RB_INQUEUE_FALSE;
812 list_del(curr);
814 if (caller == QOS_CALLER_KPACKET) {
815 kref_put(&(container_of(rb, struct neighbor,
816 rb_kp)->ref), neighbor_free);
817 } else if (caller == QOS_CALLER_CONN_RETRANS) {
818 kref_put(&(container_of(rb, struct neighbor,
819 rb_cr)->ref), neighbor_free);
820 } else if (caller == QOS_CALLER_ANNOUNCE) {
821 kref_put(&(container_of(rb,
822 struct announce_data, rb)->ref),
823 announce_data_free);
824 } else if (caller == QOS_CALLER_NEIGHBOR) {
825 kref_put(&(container_of(rb,
826 struct neighbor, rb)->ref),
827 neighbor_free);
828 } else {
829 BUG();
831 kref_put(&(q->ref), kreffree_bug);
835 static struct qos_queue *unlink_queue(struct net_device *dev)
837 struct qos_queue *ret = 0;
838 struct list_head *curr;
840 spin_lock_bh(&(queues_lock));
841 curr = queues.next;
842 while (curr != (&queues)) {
843 struct qos_queue *q = container_of(curr,
844 struct qos_queue, queue_list);
845 if (dev == 0 || q->dev == dev) {
846 ret = q;
847 kref_get(&(ret->ref));
849 list_del(&(q->queue_list));
850 kref_put(&(q->ref), kreffree_bug);
851 break;
853 curr = curr->next;
855 spin_unlock_bh(&(queues_lock));
856 return ret;
859 int destroy_queue(struct net_device *dev)
861 int rc = 1;
862 unsigned long iflags;
864 while (1) {
865 struct qos_queue *q = unlink_queue(dev);
867 if (q == 0)
868 break;
870 rc = 0;
872 spin_lock_irqsave(&(q->qlock), iflags);
873 if (q->dev != 0) {
874 dev_put(q->dev);
875 q->dev = 0;
877 _destroy_queue(q, QOS_CALLER_KPACKET);
878 _destroy_queue(q, QOS_CALLER_CONN_RETRANS);
879 _destroy_queue(q, QOS_CALLER_ANNOUNCE);
880 _destroy_queue(q, QOS_CALLER_NEIGHBOR);
881 spin_unlock_irqrestore(&(q->qlock), iflags);
883 tasklet_kill(&(q->qos_resume_task));
885 kref_put(&(q->ref), free_qos);
888 return rc;
891 int create_queue(struct net_device *dev)
893 struct qos_queue *q = kmalloc(sizeof(struct qos_queue), GFP_KERNEL);
895 if (q == 0) {
896 printk(KERN_ERR "cor: unable to allocate memory for device "
897 "queue, not enabling device");
898 return 1;
901 memset(q, 0, sizeof(struct qos_queue));
903 spin_lock_init(&(q->qlock));
905 kref_init(&(q->ref));
907 q->dev = dev;
908 dev_hold(dev);
910 timer_setup(&(q->qos_resume_timer), qos_resume_timerfunc, 0);
911 tasklet_init(&(q->qos_resume_task), qos_resume_taskfunc,
912 (unsigned long) q);
914 INIT_LIST_HEAD(&(q->kpackets_waiting));
915 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
916 INIT_LIST_HEAD(&(q->announce_waiting));
917 INIT_LIST_HEAD(&(q->neighbors_waiting));
919 atomic_set(&(q->cong_status), 0);
921 spin_lock_bh(&(queues_lock));
922 list_add(&(q->queue_list), &queues);
923 spin_unlock_bh(&(queues_lock));
925 return 0;
928 static void qos_queue_set_congstatus(struct qos_queue *q_locked)
930 __u32 newstatus;
932 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ/50)) {
933 newstatus = CONGSTATUS_NONE;
934 } else if (list_empty(&(q_locked->kpackets_waiting)) == 0) {
935 newstatus = CONGSTATUS_KPACKETS;
936 } else if (list_empty(&(q_locked->conn_retrans_waiting)) == 0) {
937 newstatus = CONGSTATUS_RETRANS;
938 } else if (list_empty(&(q_locked->announce_waiting)) == 0) {
939 newstatus = CONGSTATUS_ANNOUNCE;
940 } else if (list_empty(&(q_locked->neighbors_waiting)) == 0) {
941 newstatus = CONGSTATUS_CONNDATA;
942 } else {
943 newstatus = CONGSTATUS_NONE;
946 atomic_set(&(q_locked->cong_status), newstatus);
949 void qos_set_lastdrop(struct qos_queue *q)
951 unsigned long iflags;
953 spin_lock_irqsave(&(q->qlock), iflags);
954 q->jiffies_lastdrop = jiffies;
955 qos_queue_set_congstatus(q);
956 spin_unlock_irqrestore(&(q->qlock), iflags);
960 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
961 * caller
963 static void _qos_enqueue(struct qos_queue *q, struct resume_block *rb,
964 ktime_t cmsg_send_start, int caller, int from_nbcongwin_resume)
966 int queues_empty;
968 if (rb->in_queue == RB_INQUEUE_TRUE) {
969 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
971 if (caller == QOS_CALLER_KPACKET) {
972 struct neighbor *nb = container_of(rb, struct neighbor,
973 rb_kp);
974 if (ktime_before(cmsg_send_start, nb->cmsg_send_start))
975 nb->cmsg_send_start = cmsg_send_start;
977 return;
978 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
979 from_nbcongwin_resume == 0) {
980 return;
983 if (unlikely(qos_queue_is_destroyed(q)))
984 return;
986 queues_empty = list_empty(&(q->kpackets_waiting)) &&
987 list_empty(&(q->conn_retrans_waiting)) &&
988 list_empty(&(q->announce_waiting)) &&
989 list_empty(&(q->neighbors_waiting));
991 BUG_ON(!queues_empty && q->qos_resume_scheduled == 0);
993 rb->in_queue = RB_INQUEUE_TRUE;
995 if (caller == QOS_CALLER_KPACKET) {
996 struct neighbor *nb = container_of(rb, struct neighbor, rb_kp);
997 nb->cmsg_send_start = cmsg_send_start;
998 list_add_tail(&(rb->lh), &(q->kpackets_waiting));
999 kref_get(&(nb->ref));
1000 } else if (caller == QOS_CALLER_CONN_RETRANS) {
1001 list_add_tail(&(rb->lh) , &(q->conn_retrans_waiting));
1002 kref_get(&(container_of(rb, struct neighbor, rb_cr)->ref));
1003 } else if (caller == QOS_CALLER_ANNOUNCE) {
1004 list_add_tail(&(rb->lh), &(q->announce_waiting));
1005 kref_get(&(container_of(rb, struct announce_data, rb)->ref));
1006 } else if (caller == QOS_CALLER_NEIGHBOR) {
1007 struct neighbor *nb = container_of(rb, struct neighbor, rb);
1008 list_add_tail(&(rb->lh), &(q->neighbors_waiting));
1009 kref_get(&(nb->ref));
1010 BUG_ON(nb->conns_waiting.cnt == 0);
1011 q->numconns += nb->conns_waiting.cnt;
1012 q->priority_sum += nb->conns_waiting.priority_sum;
1013 } else {
1014 BUG();
1016 kref_get(&(q->ref));
1018 if (q->qos_resume_scheduled == 0) {
1019 q->jiffies_lastprogress = jiffies;
1020 q->qos_resume_scheduled = 1;
1021 if (caller == QOS_CALLER_KPACKET || from_nbcongwin_resume) {
1022 tasklet_schedule(&(q->qos_resume_task));
1023 } else {
1024 if (mod_timer(&(q->qos_resume_timer), jiffies + 1) ==
1025 0) {
1026 kref_get(&(q->ref));
1031 qos_queue_set_congstatus(q);
1034 void qos_enqueue(struct qos_queue *q, struct resume_block *rb,
1035 ktime_t cmsg_send_start, int caller)
1037 unsigned long iflags;
1039 spin_lock_irqsave(&(q->qlock), iflags);
1040 _qos_enqueue(q, rb, cmsg_send_start, caller, 0);
1041 spin_unlock_irqrestore(&(q->qlock), iflags);
1044 void qos_remove_conn(struct conn *trgt_out_lx)
1046 unsigned long iflags;
1047 struct neighbor *nb = trgt_out_lx->target.out.nb;
1048 struct qos_queue *q = nb->queue;
1049 int sched_cmsg = 0;
1050 int krefput_nb = 0;
1052 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1053 BUG_ON(q == 0);
1055 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1056 if (trgt_out_lx->target.out.rb.in_queue == RB_INQUEUE_FALSE) {
1057 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1058 return;
1060 spin_lock(&(q->qlock));
1062 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_FALSE;
1063 list_del(&(trgt_out_lx->target.out.rb.lh));
1064 BUG_ON(nb->conns_waiting.cnt == 0);
1065 nb->conns_waiting.cnt--;
1066 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1067 BUG_ON(q->numconns == 0);
1068 q->numconns--;
1071 BUG_ON(nb->conns_waiting.priority_sum <
1072 trgt_out_lx->target.out.rb_priority);
1073 BUG_ON(q->priority_sum < trgt_out_lx->target.out.rb_priority);
1074 nb->conns_waiting.priority_sum -=
1075 trgt_out_lx->target.out.rb_priority;
1076 q->priority_sum -= trgt_out_lx->target.out.rb_priority;
1077 trgt_out_lx->target.out.rb_priority = 0;
1079 if (list_empty(&(nb->conns_waiting.lh)) &&
1080 list_empty(&(nb->conns_waiting.lh_nextpass))) {
1081 BUG_ON(nb->conns_waiting.priority_sum != 0);
1082 BUG_ON(nb->conns_waiting.cnt != 0);
1083 } else {
1084 BUG_ON(nb->conns_waiting.cnt == 0);
1087 if (list_empty(&(nb->conns_waiting.lh)) &&
1088 list_empty(&(nb->conns_waiting.lh_nextpass)) &&
1089 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1090 nb->rb.in_queue = RB_INQUEUE_FALSE;
1091 list_del(&(nb->rb.lh));
1092 if (atomic_read(&(nb->cmsg_delay_conndata)) != 0) {
1093 atomic_set(&(nb->cmsg_delay_conndata), 0);
1094 sched_cmsg = 1;
1097 krefput_nb = 1;
1099 BUG_ON(list_empty(&(q->neighbors_waiting)) && q->numconns != 0);
1100 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1101 q->priority_sum != 0);
1103 qos_queue_set_congstatus(q);
1106 spin_unlock(&(q->qlock));
1107 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1109 if (sched_cmsg) {
1110 spin_lock_bh(&(nb->cmsg_lock));
1111 schedule_controlmsg_timer(nb);
1112 spin_unlock_bh(&(nb->cmsg_lock));
1115 kref_put(&(trgt_out_lx->ref), kreffree_bug);
1117 if (krefput_nb)
1118 kref_put(&(nb->ref), neighbor_free);
1121 static void qos_enqueue_conn(struct conn *trgt_out_lx)
1123 unsigned long iflags;
1124 struct neighbor *nb = trgt_out_lx->target.out.nb;
1125 struct qos_queue *q;
1127 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1129 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1131 if (trgt_out_lx->target.out.rb.in_queue != RB_INQUEUE_FALSE)
1132 goto out;
1134 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_TRUE;
1135 list_add_tail(&(trgt_out_lx->target.out.rb.lh),
1136 &(nb->conns_waiting.lh));
1137 kref_get(&(trgt_out_lx->ref));
1138 nb->conns_waiting.cnt++;
1140 q = trgt_out_lx->target.out.nb->queue;
1141 spin_lock(&(q->qlock));
1142 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1143 q->numconns++;
1144 } else {
1145 _qos_enqueue(q, &(nb->rb), ns_to_ktime(0), QOS_CALLER_NEIGHBOR,
1148 spin_unlock(&(q->qlock));
1150 out:
1151 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1154 static struct sk_buff *create_packet(struct neighbor *nb, int size,
1155 gfp_t alloc_flags)
1157 struct sk_buff *ret;
1159 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1160 nb->dev->needed_tailroom, alloc_flags);
1161 if (unlikely(ret == 0))
1162 return 0;
1164 ret->protocol = htons(ETH_P_COR);
1165 ret->dev = nb->dev;
1167 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1168 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1169 nb->dev->dev_addr, ret->len) < 0))
1170 return 0;
1171 skb_reset_network_header(ret);
1173 return ret;
1176 struct sk_buff *create_packet_cmsg(struct neighbor *nb, int size,
1177 gfp_t alloc_flags, __u64 seqno)
1179 struct sk_buff *ret;
1180 char *dest;
1182 ret = create_packet(nb, size + 7, alloc_flags);
1183 if (unlikely(ret == 0))
1184 return 0;
1186 dest = skb_put(ret, 7);
1187 BUG_ON(dest == 0);
1189 dest[0] = PACKET_TYPE_CMSG;
1190 dest += 1;
1192 put_u48(dest, seqno);
1193 dest += 6;
1195 return ret;
1198 struct sk_buff *create_packet_conndata(struct neighbor *nb, int size,
1199 gfp_t alloc_flags, __u32 conn_id, __u64 seqno,
1200 __u8 snd_delayed_lowbuf, __u8 flush)
1202 struct sk_buff *ret;
1203 char *dest;
1205 ret = create_packet(nb, size + 11, alloc_flags);
1206 if (unlikely(ret == 0))
1207 return 0;
1209 dest = skb_put(ret, 11);
1210 BUG_ON(dest == 0);
1212 if (flush != 0) {
1213 if (snd_delayed_lowbuf != 0) {
1214 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH;
1215 } else {
1216 dest[0] = PACKET_TYPE_CONNDATA_FLUSH;
1218 } else {
1219 if (snd_delayed_lowbuf != 0) {
1220 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED;
1221 } else {
1222 dest[0] = PACKET_TYPE_CONNDATA;
1225 dest += 1;
1227 put_u32(dest, conn_id);
1228 dest += 4;
1229 put_u48(dest, seqno);
1230 dest += 6;
1232 return ret;
1235 void reschedule_conn_retrans_timer(struct neighbor *nb_retransconnlocked)
1237 struct conn_retrans *cr = 0;
1239 if (list_empty(&(nb_retransconnlocked->retrans_conn_list)))
1240 return;
1242 cr = container_of(nb_retransconnlocked->retrans_conn_list.next,
1243 struct conn_retrans, timeout_list);
1245 if (time_before_eq(cr->timeout, jiffies)) {
1246 qos_enqueue(nb_retransconnlocked->queue,
1247 &(nb_retransconnlocked->rb_cr), ns_to_ktime(0),
1248 QOS_CALLER_CONN_RETRANS);
1249 } else {
1250 if (mod_timer(&(nb_retransconnlocked->retrans_conn_timer),
1251 cr->timeout) == 0) {
1252 kref_get(&(nb_retransconnlocked->ref));
1258 * warning:
1259 * caller must also call kref_get/put, see reschedule_conn_retrans_timer
1261 static void cancel_conn_retrans(struct neighbor *nb_retransconnlocked,
1262 struct conn *trgt_out_lx, struct conn_retrans *cr,
1263 __u64 *bytes_acked)
1265 if (unlikely(cr->state == CONN_RETRANS_ACKED))
1266 return;
1268 if (cr->state == CONN_RETRANS_SCHEDULED) {
1269 list_del(&(cr->timeout_list));
1270 } else if (cr->state == CONN_RETRANS_LOWWINDOW) {
1271 BUG_ON(trgt_out_lx->target.out.retrans_lowwindow == 0);
1272 if (likely(trgt_out_lx->target.out.retrans_lowwindow != 65535))
1273 trgt_out_lx->target.out.retrans_lowwindow--;
1276 if (cr->state != CONN_RETRANS_INITIAL)
1277 *bytes_acked += cr->length;
1279 list_del(&(cr->conn_list));
1280 cr->state = CONN_RETRANS_ACKED;
1282 kref_put(&(cr->ref), free_connretrans);
1286 * nb->retrans_conn_lock must be held when calling this
1287 * (see schedule_retransmit_conn())
1289 static void cancel_acked_conn_retrans(struct conn *trgt_out_l,
1290 __u64 *bytes_acked)
1292 __u64 seqno_acked = trgt_out_l->target.out.seqno_acked;
1294 while (list_empty(&(trgt_out_l->target.out.retrans_list)) == 0) {
1295 struct conn_retrans *cr = container_of(
1296 trgt_out_l->target.out.retrans_list.next,
1297 struct conn_retrans, conn_list);
1299 if (seqno_after(cr->seqno + cr->length, seqno_acked)) {
1300 if (seqno_before(cr->seqno, seqno_acked)) {
1301 *bytes_acked += seqno_clean(seqno_acked -
1302 cr->seqno);
1303 cr->length -= seqno_clean(seqno_acked -
1304 cr->seqno);
1305 cr->seqno = seqno_acked;
1307 break;
1310 cancel_conn_retrans(trgt_out_l->target.out.nb, trgt_out_l, cr,
1311 bytes_acked);
1314 reschedule_conn_retrans_timer(trgt_out_l->target.out.nb);
1317 void cancel_all_conn_retrans(struct conn *trgt_out_lx)
1319 struct neighbor *nb = trgt_out_lx->target.out.nb;
1320 __u64 bytes_acked = 0;
1322 spin_lock_bh(&(nb->retrans_conn_lock));
1324 while (list_empty(&(trgt_out_lx->target.out.retrans_list)) == 0) {
1325 struct conn_retrans *cr = container_of(
1326 trgt_out_lx->target.out.retrans_list.next,
1327 struct conn_retrans, conn_list);
1328 BUG_ON(cr->trgt_out_o != trgt_out_lx);
1330 cancel_conn_retrans(nb, trgt_out_lx, cr, &bytes_acked);
1333 reschedule_conn_retrans_timer(nb);
1335 spin_unlock_bh(&(nb->retrans_conn_lock));
1337 if (bytes_acked > 0)
1338 nbcongwin_data_acked(nb, bytes_acked);
1341 static void cancel_all_conn_retrans_nb(struct neighbor *nb)
1343 __u64 bytes_acked = 0;
1345 while (1) {
1346 struct conn_retrans *cr;
1348 spin_lock_bh(&(nb->retrans_conn_lock));
1350 if (list_empty(&(nb->retrans_conn_list))) {
1351 spin_unlock_bh(&(nb->retrans_conn_lock));
1352 break;
1355 cr = container_of(nb->retrans_conn_list.next,
1356 struct conn_retrans, timeout_list);
1358 kref_get(&(cr->ref));
1360 spin_unlock_bh(&(nb->retrans_conn_lock));
1363 spin_lock_bh(&(cr->trgt_out_o->rcv_lock));
1364 spin_lock_bh(&(nb->retrans_conn_lock));
1366 if (likely(cr == container_of(nb->retrans_conn_list.next,
1367 struct conn_retrans, timeout_list)))
1368 cancel_conn_retrans(nb, cr->trgt_out_o, cr,
1369 &bytes_acked);
1371 spin_unlock_bh(&(nb->retrans_conn_lock));
1372 spin_unlock_bh(&(cr->trgt_out_o->rcv_lock));
1374 kref_put(&(cr->ref), free_connretrans);
1377 if (bytes_acked > 0)
1378 nbcongwin_data_acked(nb, bytes_acked);
1381 static struct conn_retrans *prepare_conn_retrans(struct conn *trgt_out_l,
1382 __u64 seqno, __u32 len, __u8 snd_delayed_lowbuf,
1383 struct conn_retrans *cr_splitted, int retransconnlocked)
1385 struct neighbor *nb = trgt_out_l->target.out.nb;
1387 struct conn_retrans *cr = kmem_cache_alloc(connretrans_slab,
1388 GFP_ATOMIC);
1390 if (unlikely(cr == 0))
1391 return 0;
1393 BUG_ON(trgt_out_l->isreset != 0);
1395 memset(cr, 0, sizeof (struct conn_retrans));
1396 cr->trgt_out_o = trgt_out_l;
1397 kref_get(&(trgt_out_l->ref));
1398 cr->seqno = seqno;
1399 cr->length = len;
1400 cr->snd_delayed_lowbuf = snd_delayed_lowbuf;
1401 kref_init(&(cr->ref));
1403 kref_get(&(cr->ref));
1404 if (retransconnlocked == 0)
1405 spin_lock_bh(&(nb->retrans_conn_lock));
1407 if (cr_splitted != 0)
1408 list_add(&(cr->conn_list), &(cr_splitted->conn_list));
1409 else
1410 list_add_tail(&(cr->conn_list),
1411 &(cr->trgt_out_o->target.out.retrans_list));
1413 if (retransconnlocked == 0)
1414 spin_unlock_bh(&(nb->retrans_conn_lock));
1416 return cr;
1419 #define RC_SENDRETRANS_OK 0
1420 #define RC_SENDRETRANS_OOM 1
1421 #define RC_SENDRETRANS_QUEUEFULL 2
1422 #define RC_SENDRETRANS_QUEUEFULLDROPPED 3
1424 static int __send_retrans(struct neighbor *nb, struct conn *trgt_out_l,
1425 struct conn_retrans *cr, __u64 *bytes_sent)
1427 __u8 flush = 0;
1429 BUG_ON(cr->length == 0);
1431 if (trgt_out_l->flush != 0 && seqno_eq(cr->seqno + cr->length,
1432 trgt_out_l->target.out.seqno_nextsend) &&
1433 trgt_out_l->data_buf.read_remaining == 0)
1434 flush = 1;
1436 if (send_conndata_as_skb(nb, cr->length)) {
1437 struct sk_buff *skb;
1438 char *dst;
1439 int rc;
1441 skb = create_packet_conndata(nb, cr->length, GFP_ATOMIC,
1442 trgt_out_l->target.out.conn_id, cr->seqno,
1443 cr->snd_delayed_lowbuf, flush);
1444 if (unlikely(skb == 0))
1445 return RC_SENDRETRANS_OOM;
1447 dst = skb_put(skb, cr->length);
1449 databuf_pullold(trgt_out_l, cr->seqno, dst, cr->length);
1451 rc = cor_dev_queue_xmit(skb, nb->queue,
1452 QOS_CALLER_CONN_RETRANS);
1453 if (rc == NET_XMIT_DROP)
1454 return RC_SENDRETRANS_QUEUEFULLDROPPED;
1455 schedule_retransmit_conn(cr, 1, 0);
1456 if (rc != NET_XMIT_SUCCESS)
1457 return RC_SENDRETRANS_QUEUEFULL;
1459 } else {
1460 struct control_msg_out *cm;
1461 char *buf;
1463 buf = kmalloc(cr->length, GFP_ATOMIC);
1464 if (unlikely(buf == 0))
1465 return RC_SENDRETRANS_OOM;
1467 cm = alloc_control_msg(nb, ACM_PRIORITY_LOW);
1468 if (unlikely(cm == 0)) {
1469 kfree(buf);
1470 return RC_SENDRETRANS_OOM;
1473 databuf_pullold(trgt_out_l, cr->seqno, buf, cr->length);
1475 send_conndata(cm, trgt_out_l->target.out.conn_id,
1476 cr->seqno, buf, buf, cr->length,
1477 cr->snd_delayed_lowbuf, flush,
1478 trgt_out_l->is_highlatency, cr);
1481 *bytes_sent += cr->length;
1483 return RC_SENDRETRANS_OK;
1486 static int _send_retrans_splitcr_ifneeded(struct neighbor *nb_retransconnlocked,
1487 struct conn *trgt_out_l, struct conn_retrans *cr)
1489 __u32 targetmss = mss_conndata(nb_retransconnlocked,
1490 trgt_out_l->is_highlatency != 0);
1491 __u64 windowlimit = seqno_clean(
1492 trgt_out_l->target.out.seqno_windowlimit -
1493 cr->seqno);
1494 __u32 maxsize = targetmss;
1495 if (windowlimit < maxsize)
1496 maxsize = windowlimit;
1498 if (unlikely(cr->length > maxsize)) {
1499 struct conn_retrans *cr2 = prepare_conn_retrans(trgt_out_l,
1500 cr->seqno + maxsize, cr->length - maxsize,
1501 cr->snd_delayed_lowbuf, cr, 1);
1502 if (unlikely(cr2 == 0))
1503 return RC_SENDRETRANS_OOM;
1505 cr2->timeout = cr->timeout;
1507 list_add(&(cr2->timeout_list),
1508 &(nb_retransconnlocked->retrans_conn_list));
1509 cr2->state = CONN_RETRANS_SCHEDULED;
1511 cr->length = maxsize;
1514 return RC_SENDRETRANS_OK;
1517 static int _send_retrans(struct neighbor *nb, struct conn_retrans *cr,
1518 __u64 *bytes_sent)
1521 struct conn *trgt_out_o = cr->trgt_out_o;
1522 int rc = RC_SENDRETRANS_OK;
1524 spin_lock_bh(&(trgt_out_o->rcv_lock));
1526 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1527 BUG_ON(trgt_out_o->target.out.nb != nb);
1529 spin_lock_bh(&(nb->retrans_conn_lock));
1530 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1531 spin_unlock_bh(&(nb->retrans_conn_lock));
1532 goto out;
1535 BUG_ON(trgt_out_o->isreset != 0);
1537 BUG_ON(seqno_before(cr->seqno, trgt_out_o->target.out.seqno_acked));
1539 if (seqno_after_eq(cr->seqno,
1540 trgt_out_o->target.out.seqno_windowlimit)) {
1541 BUG_ON(cr->state != CONN_RETRANS_SENDING);
1542 cr->state = CONN_RETRANS_LOWWINDOW;
1543 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1544 trgt_out_o->target.out.retrans_lowwindow++;
1546 spin_unlock_bh(&(nb->retrans_conn_lock));
1547 goto out;
1550 rc = _send_retrans_splitcr_ifneeded(nb, trgt_out_o, cr);
1552 spin_unlock_bh(&(nb->retrans_conn_lock));
1554 kref_get(&(trgt_out_o->ref));
1556 if (rc == RC_SENDRETRANS_OK)
1557 rc = __send_retrans(nb, trgt_out_o, cr, bytes_sent);
1559 if (rc == RC_SENDRETRANS_OOM || rc == RC_SENDRETRANS_QUEUEFULLDROPPED) {
1560 spin_lock_bh(&(nb->retrans_conn_lock));
1561 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1562 } else if (likely(cr->state == CONN_RETRANS_SENDING)) {
1563 if (rc == RC_SENDRETRANS_OOM)
1564 cr->timeout = jiffies + 1;
1565 list_add(&(cr->timeout_list), &(nb->retrans_conn_list));
1566 cr->state = CONN_RETRANS_SCHEDULED;
1567 } else {
1568 BUG();
1570 spin_unlock_bh(&(nb->retrans_conn_lock));
1573 out:
1574 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1576 kref_put(&(trgt_out_o->ref), free_conn);
1578 return (rc == RC_SENDRETRANS_OOM ||
1579 rc == RC_SENDRETRANS_QUEUEFULL ||
1580 rc == RC_SENDRETRANS_QUEUEFULLDROPPED);
1583 static int send_retrans(struct neighbor *nb, int *sent)
1585 int queuefull = 0;
1586 int nbstate = get_neigh_state(nb);
1587 __u64 bytes_sent = 0;
1589 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
1590 return QOS_RESUME_DONE;
1591 } else if (unlikely(nbstate == NEIGHBOR_STATE_KILLED)) {
1593 * cancel_all_conn_retrans_nb should not be needed, because
1594 * reset_all_conns calls cancel_all_conn_retrans
1596 cancel_all_conn_retrans_nb(nb);
1597 return QOS_RESUME_DONE;
1600 while (1) {
1601 struct conn_retrans *cr = 0;
1603 spin_lock_bh(&(nb->retrans_conn_lock));
1605 if (list_empty(&(nb->retrans_conn_list))) {
1606 spin_unlock_bh(&(nb->retrans_conn_lock));
1607 break;
1610 cr = container_of(nb->retrans_conn_list.next,
1611 struct conn_retrans, timeout_list);
1613 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1615 if (time_after(cr->timeout, jiffies)) {
1616 spin_unlock_bh(&(nb->retrans_conn_lock));
1617 break;
1620 kref_get(&(cr->ref));
1621 list_del(&(cr->timeout_list));
1622 cr->state = CONN_RETRANS_SENDING;
1624 spin_unlock_bh(&(nb->retrans_conn_lock));
1626 queuefull = _send_retrans(nb, cr, &bytes_sent);
1627 kref_put(&(cr->ref), free_connretrans);
1628 if (queuefull) {
1629 break;
1630 } else {
1631 *sent = 1;
1635 if (bytes_sent > 0)
1636 nbcongwin_data_retransmitted(nb, bytes_sent);
1638 return queuefull ? QOS_RESUME_CONG : QOS_RESUME_DONE;
1641 void retransmit_conn_timerfunc(struct timer_list *retrans_conn_timer)
1643 struct neighbor *nb = container_of(retrans_conn_timer,
1644 struct neighbor, retrans_conn_timer);
1645 qos_enqueue(nb->queue, &(nb->rb_cr), ns_to_ktime(0),
1646 QOS_CALLER_CONN_RETRANS);
1647 kref_put(&(nb->ref), neighbor_free);
1650 static void conn_ack_ooo_rcvd_splitcr(struct conn *trgt_out_l,
1651 struct conn_retrans *cr, __u64 seqno_ooo, __u32 length,
1652 __u64 *bytes_acked)
1654 struct conn_retrans *cr2;
1655 __u64 seqno_cr2start;
1656 __u32 oldcrlenght = cr->length;
1658 if (cr->state != CONN_RETRANS_SCHEDULED &&
1659 cr->state != CONN_RETRANS_LOWWINDOW)
1660 return;
1662 seqno_cr2start = seqno_ooo+length;
1663 cr2 = prepare_conn_retrans(trgt_out_l, seqno_cr2start,
1664 seqno_clean(cr->seqno + cr->length - seqno_cr2start),
1665 cr->snd_delayed_lowbuf, cr, 1);
1667 if (unlikely(cr2 == 0))
1668 return;
1670 BUG_ON(cr2->length > cr->length);
1672 cr2->timeout = cr->timeout;
1673 cr2->state = cr->state;
1675 if (cr->state != CONN_RETRANS_SCHEDULED)
1676 list_add(&(cr2->timeout_list), &(cr->timeout_list));
1678 BUG_ON(seqno_clean(seqno_ooo - cr->seqno) > cr->length);
1680 cr->length -= seqno_clean(seqno_ooo - cr->seqno);
1681 BUG_ON(cr->length + length + cr2->length != oldcrlenght);
1683 *bytes_acked += length;
1686 void conn_ack_ooo_rcvd(struct neighbor *nb, __u32 conn_id,
1687 struct conn *trgt_out, __u64 seqno_ooo, __u32 length,
1688 __u64 *bytes_acked)
1690 struct list_head *curr;
1692 if (unlikely(length == 0))
1693 return;
1695 spin_lock_bh(&(trgt_out->rcv_lock));
1697 if (unlikely(trgt_out->targettype != TARGET_OUT))
1698 goto out;
1699 if (unlikely(trgt_out->target.out.nb != nb))
1700 goto out;
1701 if (unlikely(trgt_out->target.out.conn_id != conn_id))
1702 goto out;
1704 kref_get(&(nb->ref));
1705 spin_lock_bh(&(nb->retrans_conn_lock));
1707 curr = trgt_out->target.out.retrans_list.next;
1709 while (curr != &(trgt_out->target.out.retrans_list)) {
1710 struct conn_retrans *cr = container_of(curr,
1711 struct conn_retrans, conn_list);
1713 int ack_covers_start = seqno_after_eq(cr->seqno, seqno_ooo);
1714 int ack_covers_end = seqno_before_eq(cr->seqno + cr->length,
1715 seqno_ooo + length);
1717 curr = curr->next;
1719 if (seqno_before(cr->seqno + cr->length, seqno_ooo))
1720 continue;
1722 if (seqno_after(cr->seqno, seqno_ooo + length))
1723 break;
1725 if (likely(ack_covers_start && ack_covers_end)) {
1726 cancel_conn_retrans(nb, trgt_out, cr, bytes_acked);
1727 reschedule_conn_retrans_timer(nb);
1728 } else if (ack_covers_start) {
1729 __u32 diff = seqno_ooo + length - cr->seqno -
1730 cr->length;
1731 BUG_ON(diff >= cr->length);
1732 cr->seqno += diff;
1733 cr->length -= diff;
1734 *bytes_acked =+ diff;
1735 } else if (ack_covers_end) {
1736 __u32 diff = seqno_ooo + length - cr->seqno;
1737 BUG_ON(diff >= length);
1738 cr->length -= diff;
1739 *bytes_acked += diff;
1740 } else {
1741 conn_ack_ooo_rcvd_splitcr(trgt_out, cr, seqno_ooo,
1742 length, bytes_acked);
1743 break;
1747 if (unlikely(list_empty(&(trgt_out->target.out.retrans_list)) == 0)) {
1748 trgt_out->target.out.seqno_acked =
1749 trgt_out->target.out.seqno_nextsend;
1750 } else {
1751 struct conn_retrans *cr = container_of(
1752 trgt_out->target.out.retrans_list.next,
1753 struct conn_retrans, conn_list);
1754 if (seqno_after(cr->seqno, trgt_out->target.out.seqno_acked))
1755 trgt_out->target.out.seqno_acked = cr->seqno;
1758 spin_unlock_bh(&(nb->retrans_conn_lock));
1759 kref_put(&(nb->ref), neighbor_free);
1761 out:
1762 spin_unlock_bh(&(trgt_out->rcv_lock));
1765 static void _conn_ack_rcvd_nosendwin(struct conn *trgt_out_l)
1767 if (trgt_out_l->bufsize.state == BUFSIZE_INCR ||
1768 trgt_out_l->bufsize.state == BUFSIZE_INCR_FAST)
1769 trgt_out_l->bufsize.state = BUFSIZE_NOACTION;
1771 if (trgt_out_l->bufsize.state == BUFSIZE_NOACTION)
1772 trgt_out_l->bufsize.act.noact.bytesleft = max(
1773 trgt_out_l->bufsize.act.noact.bytesleft,
1774 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1776 trgt_out_l->bufsize.ignore_rcv_lowbuf = max(
1777 trgt_out_l->bufsize.ignore_rcv_lowbuf,
1778 (__u32) BUF_OUT_WIN_NOK_NOINCR);
1782 * nb->retrans_conn_lock must be held when calling this
1783 * (see schedule_retransmit_conn())
1785 static void reschedule_lowwindow_retrans(struct conn *trgt_out_l)
1787 struct list_head *lh = trgt_out_l->target.out.retrans_list.next;
1788 int cnt = 0;
1790 while (trgt_out_l->target.out.retrans_lowwindow > 0 && cnt < 100) {
1791 struct conn_retrans *cr;
1793 if (unlikely(lh == &(trgt_out_l->target.out.retrans_list))) {
1794 BUG_ON(trgt_out_l->target.out.retrans_lowwindow !=
1795 65535);
1796 trgt_out_l->target.out.retrans_lowwindow = 0;
1797 break;
1800 cr = container_of(lh, struct conn_retrans, conn_list);
1802 if (seqno_after_eq(cr->seqno,
1803 trgt_out_l->target.out.seqno_windowlimit)) {
1804 break;
1807 if (cr->state == CONN_RETRANS_LOWWINDOW)
1808 schedule_retransmit_conn(cr, 1, 1);
1810 lh = lh->next;
1811 cnt++;
1815 void conn_ack_rcvd(struct neighbor *nb, __u32 conn_id, struct conn *trgt_out,
1816 __u64 seqno, int setwindow, __u8 window, __u64 *bytes_acked)
1818 int seqno_advanced = 0;
1819 int window_enlarged = 0;
1821 spin_lock_bh(&(trgt_out->rcv_lock));
1823 if (unlikely(trgt_out->isreset != 0))
1824 goto out;
1825 if (unlikely(trgt_out->targettype != TARGET_OUT))
1826 goto out;
1827 if (unlikely(trgt_out->target.out.nb != nb))
1828 goto out;
1829 if (unlikely(trgt_out->reversedir->source.in.conn_id != conn_id))
1830 goto out;
1832 if (unlikely(seqno_after(seqno, trgt_out->target.out.seqno_nextsend) ||
1833 seqno_before(seqno, trgt_out->target.out.seqno_acked)))
1834 goto out;
1836 if (setwindow) {
1837 __u64 windowdec = dec_log_64_7(window);
1838 if (likely(seqno_after(seqno,
1839 trgt_out->target.out.seqno_acked)) ||
1840 seqno_after(seqno + windowdec,
1841 trgt_out->target.out.seqno_windowlimit)) {
1842 trgt_out->target.out.seqno_windowlimit = seqno +
1843 windowdec;
1844 window_enlarged = 1;
1848 if (seqno_after(seqno, trgt_out->target.out.seqno_acked))
1849 seqno_advanced = 1;
1851 if (seqno_advanced == 0 && window_enlarged == 0)
1852 goto out;
1854 kref_get(&(nb->ref));
1855 spin_lock_bh(&(nb->retrans_conn_lock));
1857 if (seqno_advanced) {
1858 trgt_out->target.out.seqno_acked = seqno;
1859 cancel_acked_conn_retrans(trgt_out, bytes_acked);
1862 if (window_enlarged)
1863 reschedule_lowwindow_retrans(trgt_out);
1865 spin_unlock_bh(&(nb->retrans_conn_lock));
1866 kref_put(&(nb->ref), neighbor_free);
1868 if (seqno_advanced)
1869 databuf_ack(trgt_out, trgt_out->target.out.seqno_acked);
1871 if (seqno_eq(trgt_out->target.out.seqno_acked,
1872 trgt_out->target.out.seqno_nextsend))
1873 _conn_ack_rcvd_nosendwin(trgt_out);
1875 out:
1876 if (seqno_advanced || window_enlarged)
1877 flush_buf(trgt_out);
1879 spin_unlock_bh(&(trgt_out->rcv_lock));
1881 wake_sender(trgt_out);
1884 static void try_combine_conn_retrans_prev(struct neighbor *nb_retransconnlocked,
1885 struct conn *trgt_out_lx, struct conn_retrans *cr)
1887 struct conn_retrans *cr_prev;
1888 __u64 bytes_dummyacked = 0;
1890 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1892 if (cr->conn_list.prev == &(trgt_out_lx->target.out.retrans_list))
1893 return;
1895 cr_prev = container_of(cr->conn_list.prev, struct conn_retrans,
1896 conn_list);
1898 if (cr_prev->state != CONN_RETRANS_SCHEDULED)
1899 return;
1900 if (cr_prev->timeout != cr->timeout)
1901 return;
1902 if (!seqno_eq(cr_prev->seqno + cr_prev->length, cr->seqno))
1903 return;
1905 cr->seqno -= cr_prev->length;
1906 cr->length += cr_prev->length;
1908 cancel_conn_retrans(nb_retransconnlocked, trgt_out_lx, cr_prev,
1909 &bytes_dummyacked);
1912 static void try_combine_conn_retrans_next(struct neighbor *nb_retranslocked,
1913 struct conn *trgt_out_lx, struct conn_retrans *cr)
1915 struct conn_retrans *cr_next;
1916 __u64 bytes_dummyacked = 0;
1918 BUG_ON(cr->state != CONN_RETRANS_SCHEDULED);
1920 if (cr->conn_list.next == &(trgt_out_lx->target.out.retrans_list))
1921 return;
1923 cr_next = container_of(cr->conn_list.next, struct conn_retrans,
1924 conn_list);
1926 if (cr_next->state != CONN_RETRANS_SCHEDULED)
1927 return;
1928 if (cr_next->timeout != cr->timeout)
1929 return;
1930 if (!seqno_eq(cr->seqno + cr->length, cr_next->seqno))
1931 return;
1933 cr->length += cr_next->length;
1935 cancel_conn_retrans(nb_retranslocked, trgt_out_lx, cr_next,
1936 &bytes_dummyacked);
1939 void schedule_retransmit_conn(struct conn_retrans *cr, int connlocked,
1940 int nbretransconn_locked)
1942 struct conn *trgt_out_o = cr->trgt_out_o;
1943 struct neighbor *nb;
1944 int first;
1946 if (connlocked == 0)
1947 spin_lock_bh(&(trgt_out_o->rcv_lock));
1949 BUG_ON(trgt_out_o->targettype != TARGET_OUT);
1950 nb = trgt_out_o->target.out.nb;
1952 cr->timeout = calc_timeout(atomic_read(&(nb->latency_retrans_us)),
1953 atomic_read(&(nb->latency_stddev_retrans_us)),
1954 atomic_read(&(nb->max_remote_ackconn_delay_us)));
1956 if (nbretransconn_locked == 0)
1957 spin_lock_bh(&(nb->retrans_conn_lock));
1959 kref_get(&(nb->ref));
1961 BUG_ON(cr->state == CONN_RETRANS_SCHEDULED);
1963 if (unlikely(cr->state == CONN_RETRANS_ACKED)) {
1964 goto out;
1965 } else if (unlikely(cr->state == CONN_RETRANS_LOWWINDOW)) {
1966 BUG_ON(trgt_out_o->target.out.retrans_lowwindow == 0);
1967 if (likely(trgt_out_o->target.out.retrans_lowwindow != 65535))
1968 trgt_out_o->target.out.retrans_lowwindow--;
1971 first = unlikely(list_empty(&(nb->retrans_conn_list)));
1972 list_add_tail(&(cr->timeout_list), &(nb->retrans_conn_list));
1973 cr->state = CONN_RETRANS_SCHEDULED;
1975 if (unlikely(first)) {
1976 reschedule_conn_retrans_timer(nb);
1977 } else {
1978 try_combine_conn_retrans_prev(nb, trgt_out_o, cr);
1979 try_combine_conn_retrans_next(nb, trgt_out_o, cr);
1982 out:
1983 if (nbretransconn_locked == 0)
1984 spin_unlock_bh(&(nb->retrans_conn_lock));
1986 kref_put(&(nb->ref), neighbor_free);
1988 if (connlocked == 0)
1989 spin_unlock_bh(&(trgt_out_o->rcv_lock));
1992 static int _flush_out_skb(struct conn *trgt_out_lx, __u32 len,
1993 __u8 snd_delayed_lowbuf)
1995 struct neighbor *nb = trgt_out_lx->target.out.nb;
1997 __u64 seqno;
1998 struct conn_retrans *cr;
1999 struct sk_buff *skb;
2000 char *dst;
2001 __u8 flush;
2002 int rc;
2004 if (trgt_out_lx->flush != 0 &&
2005 trgt_out_lx->data_buf.read_remaining == len)
2006 flush = 1;
2008 seqno = trgt_out_lx->target.out.seqno_nextsend;
2009 skb = create_packet_conndata(trgt_out_lx->target.out.nb, len,
2010 GFP_ATOMIC, trgt_out_lx->target.out.conn_id, seqno,
2011 snd_delayed_lowbuf, flush);
2012 if (unlikely(skb == 0))
2013 return RC_FLUSH_CONN_OUT_OOM;
2015 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2016 0, 0);
2017 if (unlikely(cr == 0)) {
2018 kfree_skb(skb);
2019 return RC_FLUSH_CONN_OUT_OOM;
2022 dst = skb_put(skb, len);
2024 databuf_pull(trgt_out_lx, dst, len);
2026 rc = cor_dev_queue_xmit(skb, nb->queue, QOS_CALLER_NEIGHBOR);
2027 if (rc == NET_XMIT_DROP) {
2028 databuf_unpull(trgt_out_lx, len);
2029 spin_lock_bh(&(nb->retrans_conn_lock));
2030 cancel_conn_retrans(nb, trgt_out_lx, cr, 0);
2031 spin_unlock_bh(&(nb->retrans_conn_lock));
2032 kref_put(&(cr->ref), free_connretrans);
2033 return RC_FLUSH_CONN_OUT_CONG;
2036 trgt_out_lx->target.out.seqno_nextsend += len;
2037 nbcongwin_data_sent(nb, len);
2038 schedule_retransmit_conn(cr, 1, 0);
2039 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2040 update_src_sock_sndspeed(trgt_out_lx, len);
2042 kref_put(&(cr->ref), free_connretrans);
2044 return (rc == NET_XMIT_SUCCESS) ?
2045 RC_FLUSH_CONN_OUT_OK : RC_FLUSH_CONN_OUT_SENT_CONG;
2048 static int _flush_out_conndata(struct conn *trgt_out_lx, __u32 len,
2049 __u8 snd_delayed_lowbuf)
2051 __u64 seqno;
2052 struct control_msg_out *cm;
2053 struct conn_retrans *cr;
2054 char *buf;
2055 __u8 flush = 0;
2057 if (trgt_out_lx->flush != 0 &&
2058 trgt_out_lx->data_buf.read_remaining == len)
2059 flush = 1;
2061 buf = kmalloc(len, GFP_ATOMIC);
2063 if (unlikely(buf == 0))
2064 return RC_FLUSH_CONN_OUT_OOM;
2066 cm = alloc_control_msg(trgt_out_lx->target.out.nb, ACM_PRIORITY_LOW);
2067 if (unlikely(cm == 0)) {
2068 kfree(buf);
2069 return RC_FLUSH_CONN_OUT_OOM;
2072 seqno = trgt_out_lx->target.out.seqno_nextsend;
2074 cr = prepare_conn_retrans(trgt_out_lx, seqno, len, snd_delayed_lowbuf,
2075 0, 0);
2076 if (unlikely(cr == 0)) {
2077 kfree(buf);
2078 free_control_msg(cm);
2079 return RC_FLUSH_CONN_OUT_OOM;
2082 databuf_pull(trgt_out_lx, buf, len);
2083 trgt_out_lx->target.out.seqno_nextsend += len;
2084 nbcongwin_data_sent(trgt_out_lx->target.out.nb, len);
2085 if (trgt_out_lx->sourcetype == SOURCE_SOCK)
2086 update_src_sock_sndspeed(trgt_out_lx, len);
2088 send_conndata(cm, trgt_out_lx->target.out.conn_id, seqno, buf, buf, len,
2089 snd_delayed_lowbuf, flush, trgt_out_lx->is_highlatency,
2090 cr);
2092 return RC_FLUSH_CONN_OUT_OK;
2095 int srcin_buflimit_reached(struct conn *src_in_lx)
2097 __u64 window_left;
2099 if (unlikely(seqno_before(src_in_lx->source.in.window_seqnolimit,
2100 src_in_lx->source.in.next_seqno)))
2101 return 1;
2103 window_left = seqno_clean(src_in_lx->source.in.window_seqnolimit -
2104 src_in_lx->source.in.next_seqno);
2106 if (window_left < WINDOW_ENCODE_MIN)
2107 return 1;
2109 if (window_left/2 < src_in_lx->data_buf.read_remaining)
2110 return 1;
2112 return 0;
2115 static __u32 maxsend_left_to_len(__u32 maxsend_left)
2117 __u32 i;
2118 if (maxsend_left < 128)
2119 return maxsend_left;
2121 for (i=128;i<4096;) {
2122 if (i*2 > maxsend_left)
2123 return i;
2124 i = i*2;
2127 return maxsend_left - maxsend_left%4096;
2130 static int seqno_low_sendlimit(struct conn *trgt_out_lx, __u64 windowlimit,
2131 __u32 sndlen)
2133 __u64 bytes_ackpending;
2135 BUG_ON(seqno_before(trgt_out_lx->target.out.seqno_nextsend,
2136 trgt_out_lx->target.out.seqno_acked));
2138 bytes_ackpending = seqno_clean(trgt_out_lx->target.out.seqno_nextsend -
2139 trgt_out_lx->target.out.seqno_acked);
2141 if (windowlimit <= sndlen)
2142 return 1;
2144 if (unlikely(bytes_ackpending + sndlen < bytes_ackpending))
2145 return 0;
2147 if (trgt_out_lx->is_highlatency != 0)
2148 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 4)
2149 ? 1 : 0;
2150 else
2151 return (windowlimit - sndlen < (bytes_ackpending + sndlen) / 8)
2152 ? 1 : 0;
2155 static void _flush_out_ignore_lowbuf(struct conn *trgt_out_lx)
2157 trgt_out_lx->bufsize.ignore_rcv_lowbuf = max(
2158 trgt_out_lx->bufsize.ignore_rcv_lowbuf,
2159 trgt_out_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
2162 static __u64 get_windowlimit(struct conn *trgt_out_lx)
2164 if (unlikely(seqno_before(trgt_out_lx->target.out.seqno_windowlimit,
2165 trgt_out_lx->target.out.seqno_nextsend)))
2166 return 0;
2168 return seqno_clean(trgt_out_lx->target.out.seqno_windowlimit -
2169 trgt_out_lx->target.out.seqno_nextsend);
2172 static int _flush_out(struct conn *trgt_out_lx, __u32 maxsend, __u32 *sent,
2173 int from_qos, int maxsend_forcedelay)
2175 struct neighbor *nb = trgt_out_lx->target.out.nb;
2177 __u32 targetmss;
2179 int nbstate;
2181 __u8 snd_delayed_lowbuf = trgt_out_lx->target.out.windowlimit_reached;
2183 __u32 maxsend_left = maxsend;
2185 trgt_out_lx->target.out.windowlimit_reached = 0;
2187 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
2189 if (unlikely(trgt_out_lx->target.out.established == 0))
2190 return RC_FLUSH_CONN_OUT_OK;
2192 if (unlikely(trgt_out_lx->isreset != 0))
2193 return RC_FLUSH_CONN_OUT_OK;
2195 BUG_ON(trgt_out_lx->target.out.conn_id == 0);
2197 if (unlikely(trgt_out_lx->data_buf.read_remaining == 0))
2198 return RC_FLUSH_CONN_OUT_OK;
2200 #warning todo burst queue
2201 if (from_qos == 0 && qos_fastsend_allowed_conn(trgt_out_lx) == 0)
2202 return RC_FLUSH_CONN_OUT_CONG;
2204 spin_lock_bh(&(nb->stalledconn_lock));
2205 nbstate = get_neigh_state(nb);
2206 if (unlikely(nbstate == NEIGHBOR_STATE_STALLED)) {
2207 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev == 0 &&
2208 trgt_out_lx->target.out.nbstalled_lh.next != 0);
2209 BUG_ON(trgt_out_lx->target.out.nbstalled_lh.prev != 0 &&
2210 trgt_out_lx->target.out.nbstalled_lh.next == 0);
2212 if (trgt_out_lx->target.out.nbstalled_lh.prev == 0) {
2213 kref_get(&(trgt_out_lx->ref));
2214 list_add_tail(&(trgt_out_lx->target.out.nbstalled_lh),
2215 &(nb->stalledconn_list));
2218 spin_unlock_bh(&(nb->stalledconn_lock));
2220 if (unlikely(nbstate != NEIGHBOR_STATE_ACTIVE))
2221 return RC_FLUSH_CONN_OUT_NBNOTACTIVE;
2223 /* printk(KERN_ERR "flush %p %llu %u", trgt_out_l,
2224 get_windowlimit(trgt_out_l),
2225 trgt_out_l->data_buf.read_remaining); */
2227 targetmss = mss_conndata(nb, trgt_out_lx->is_highlatency != 0);
2229 while (trgt_out_lx->data_buf.read_remaining >= targetmss) {
2230 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2231 int rc;
2233 if (maxsend_left < targetmss)
2234 break;
2236 if (windowlimit < targetmss) {
2237 trgt_out_lx->target.out.windowlimit_reached = 1;
2238 snd_delayed_lowbuf = 1;
2239 _flush_out_ignore_lowbuf(trgt_out_lx);
2240 break;
2243 if (nbcongwin_send_allowed(nb) == 0)
2244 return RC_FLUSH_CONN_OUT_CONG;
2246 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, targetmss)) {
2247 trgt_out_lx->target.out.windowlimit_reached = 1;
2248 snd_delayed_lowbuf = 1;
2251 if (likely(send_conndata_as_skb(nb, targetmss)))
2252 rc = _flush_out_skb(trgt_out_lx, targetmss,
2253 snd_delayed_lowbuf);
2254 else
2255 rc = _flush_out_conndata(trgt_out_lx, targetmss,
2256 snd_delayed_lowbuf);
2258 if (rc == RC_FLUSH_CONN_OUT_OK ||
2259 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2260 maxsend_left -= targetmss;
2261 *sent += targetmss;
2264 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2265 return RC_FLUSH_CONN_OUT_CONG;
2266 if (rc != RC_FLUSH_CONN_OUT_OK)
2267 return rc;
2270 if (trgt_out_lx->data_buf.read_remaining > 0) {
2271 __u32 len = trgt_out_lx->data_buf.read_remaining;
2272 __u64 windowlimit = get_windowlimit(trgt_out_lx);
2273 int rc;
2275 if (maxsend_left < len) {
2276 if (maxsend_left >= 65536 || (
2277 maxsend_left == maxsend &&
2278 maxsend_left >= 128 &&
2279 trgt_out_lx->is_highlatency == 0 &&
2280 !maxsend_forcedelay)) {
2281 len = maxsend_left_to_len(maxsend_left);
2282 } else {
2283 return RC_FLUSH_CONN_OUT_MAXSENT;
2287 if (trgt_out_lx->flush == 0 &&
2288 trgt_out_lx->sourcetype == SOURCE_SOCK &&
2289 cor_sock_sndbufavailable(trgt_out_lx) != 0)
2290 goto out;
2292 if (trgt_out_lx->flush == 0 &&
2293 trgt_out_lx->sourcetype == SOURCE_IN &&
2294 srcin_buflimit_reached(trgt_out_lx)
2295 == 0 && (
2296 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2297 trgt_out_lx->target.out.seqno_acked) == 0 ||
2298 trgt_out_lx->is_highlatency != 0))
2299 goto out;
2301 if (trgt_out_lx->flush == 0 &&
2302 trgt_out_lx->sourcetype == SOURCE_UNCONNECTED &&
2303 cpacket_write_allowed(trgt_out_lx) != 0)
2304 goto out;
2306 if (windowlimit == 0 || (windowlimit < len &&
2307 seqno_eq(trgt_out_lx->target.out.seqno_nextsend,
2308 trgt_out_lx->target.out.seqno_acked) == 0)) {
2309 trgt_out_lx->target.out.windowlimit_reached = 1;
2310 snd_delayed_lowbuf = 1;
2311 _flush_out_ignore_lowbuf(trgt_out_lx);
2312 goto out;
2315 if (nbcongwin_send_allowed(nb) == 0)
2316 return RC_FLUSH_CONN_OUT_CONG;
2318 if (seqno_low_sendlimit(trgt_out_lx, windowlimit, len)) {
2319 trgt_out_lx->target.out.windowlimit_reached = 1;
2320 snd_delayed_lowbuf = 1;
2323 if (len > windowlimit) {
2324 len = windowlimit;
2325 _flush_out_ignore_lowbuf(trgt_out_lx);
2328 if (send_conndata_as_skb(nb, len))
2329 rc = _flush_out_skb(trgt_out_lx, len,
2330 snd_delayed_lowbuf);
2331 else
2332 rc = _flush_out_conndata(trgt_out_lx, len,
2333 snd_delayed_lowbuf);
2336 if (rc == RC_FLUSH_CONN_OUT_OK ||
2337 rc == RC_FLUSH_CONN_OUT_SENT_CONG) {
2338 maxsend_left -= len;
2339 *sent += len;
2342 if (rc == RC_FLUSH_CONN_OUT_SENT_CONG)
2343 return RC_FLUSH_CONN_OUT_CONG;
2344 if (rc != RC_FLUSH_CONN_OUT_OK)
2345 return rc;
2348 out:
2349 return RC_FLUSH_CONN_OUT_OK;
2352 int flush_out(struct conn *trgt_out_lx, __u32 *sent)
2354 int rc = _flush_out(trgt_out_lx, 1 << 30, sent, 0, 0);
2356 if (rc == RC_FLUSH_CONN_OUT_CONG || rc == RC_FLUSH_CONN_OUT_MAXSENT ||
2357 rc == RC_FLUSH_CONN_OUT_OOM)
2358 qos_enqueue_conn(trgt_out_lx);
2360 return rc;
2363 void resume_nbstalled_conns(struct work_struct *work)
2365 struct neighbor *nb = container_of(work, struct neighbor,
2366 stalledconn_work);
2367 int rc = RC_FLUSH_CONN_OUT_OK;
2369 spin_lock_bh(&(nb->stalledconn_lock));
2370 nb->stalledconn_work_scheduled = 0;
2371 while (rc != RC_FLUSH_CONN_OUT_NBNOTACTIVE &&
2372 list_empty(&(nb->stalledconn_list)) == 0) {
2373 struct list_head *lh = nb->stalledconn_list.next;
2374 struct conn *trgt_out = container_of(lh, struct conn,
2375 target.out.nbstalled_lh);
2376 __u32 sent = 0;
2377 BUG_ON(trgt_out->targettype != TARGET_OUT);
2378 list_del(lh);
2379 lh->prev = 0;
2380 lh->next = 0;
2382 spin_unlock_bh(&(nb->stalledconn_lock));
2384 spin_lock_bh(&(trgt_out->rcv_lock));
2385 if (likely(trgt_out->targettype == TARGET_OUT))
2386 rc = flush_out(trgt_out, &sent);
2387 spin_unlock_bh(&(trgt_out->rcv_lock));
2389 if (sent != 0)
2390 wake_sender(trgt_out);
2392 kref_put(&(trgt_out->ref), free_conn);
2394 spin_lock_bh(&(nb->stalledconn_lock));
2396 spin_unlock_bh(&(nb->stalledconn_lock));
2398 kref_put(&(nb->ref), neighbor_free);
2401 int __init cor_snd_init(void)
2403 connretrans_slab = kmem_cache_create("cor_connretrans",
2404 sizeof(struct conn_retrans), 8, 0, 0);
2405 if (unlikely(connretrans_slab == 0))
2406 return -ENOMEM;
2408 return 0;
2411 MODULE_LICENSE("GPL");