32bit seqno, bugfixes
[cor.git] / net / cor / dev.c
blob2166472fe752b31b5e22340e64fb8e1b809bdb42
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/version.h>
17 #include <linux/kernel.h>
18 #include <linux/init.h>
19 #include <linux/in.h>
20 #include <linux/kthread.h>
23 #include "cor.h"
25 static struct notifier_block cor_netdev_notify;
26 __u8 cor_netdev_notify_registered;
28 __u8 cor_pack_registered;
30 static DEFINE_SPINLOCK(cor_queues_lock);
31 static LIST_HEAD(cor_queues);
32 static LIST_HEAD(cor_queues_waitexit);
34 static void cor_qos_waitexit(struct work_struct *work);
35 DECLARE_WORK(cor_qos_waitexit_work, cor_qos_waitexit);
38 static void _cor_qos_enqueue(struct cor_qos_queue *q,
39 struct cor_resume_block *rb, unsigned long cmsg_send_start_j,
40 ktime_t cmsg_send_start_kt, int caller,
41 int from_nbcongwin_resume, int from_nbnotactive_resume);
44 #ifdef DEBUG_QOS_SLOWSEND
45 static DEFINE_SPINLOCK(slowsend_lock);
46 static unsigned long cor_last_send;
49 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
51 int allowsend = 0;
52 unsigned long jiffies_tmp;
54 spin_lock_bh(&slowsend_lock);
55 jiffies_tmp = jiffies;
56 if (cor_last_send != jiffies_tmp) {
57 if (cor_last_send + 1 == jiffies_tmp)
58 cor_last_send = jiffies_tmp;
59 else
60 cor_last_send = jiffies_tmp - 1;
61 allowsend = 1;
63 spin_unlock_bh(&slowsend_lock);
65 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d\n", caller, allowsend); */
66 if (allowsend) {
67 return dev_queue_xmit(skb);
68 } else {
69 kfree_skb(skb);
70 return NET_XMIT_DROP;
73 #endif
75 void cor_free_qos(struct kref *ref)
77 struct cor_qos_queue *q = container_of(ref, struct cor_qos_queue, ref);
79 kfree(q);
83 static void cor_qos_queue_set_congstatus(struct cor_qos_queue *q_locked);
85 /**
86 * neighbor congestion window:
87 * increment by 4096 every round trip if more that 2/3 of cwin is used
89 * in case of packet loss decrease by 1/4:
90 * - <= 1/8 immediately and
91 * - <= 1/4 during the next round trip
93 * in case of multiple packet loss events, do not decrement more than once per
94 * round trip
97 #ifdef COR_NBCONGWIN
99 /*__u64 get_bufspace_used(void);
101 static void print_conn_bufstats(struct cor_neighbor *nb)
103 / * not threadsafe, but this is only for debugging... * /
104 __u64 totalsize = 0;
105 __u64 read_remaining = 0;
106 __u32 numconns = 0;
107 struct list_head *lh;
108 unsigned long iflags;
110 spin_lock_irqsave(&nb->conns_waiting.lock, iflags);
112 lh = nb->conns_waiting.lh.next;
113 while (lh != &nb->conns_waiting.lh) {
114 struct cor_conn *cn = container_of(lh, struct cor_conn,
115 trgt.out.rb.lh);
116 totalsize += cn->data_buf.datasize;
117 read_remaining += cn->data_buf.read_remaining;
118 lh = lh->next;
121 lh = nb->conns_waiting.lh_nextpass.next;
122 while (lh != &nb->conns_waiting.lh_nextpass) {
123 struct cor_conn *cn = container_of(lh, struct cor_conn,
124 target.out.rb.lh);
125 totalsize += cn->data_buf.datasize;
126 read_remaining += cn->data_buf.read_remaining;
127 lh = lh->next;
130 numconns = nb->conns_waiting.cnt;
132 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
134 printk(KERN_ERR "conn %llu %llu %u\n", totalsize, read_remaining,
135 numconns);
136 } */
138 void cor_nbcongwin_data_retransmitted(struct cor_neighbor *nb,
139 __u64 bytes_sent)
141 __u64 min_cwin = cor_mss_conndata(nb, 0) * 2 << NBCONGWIN_SHIFT;
142 __u64 cwin;
144 unsigned long iflags;
146 spin_lock_irqsave(&nb->nbcongwin.lock, iflags);
148 cwin = atomic64_read(&nb->nbcongwin.cwin);
150 /* printk(KERN_ERR "retrans %llu %llu\n", cwin >> NBCONGWIN_SHIFT,
151 get_bufspace_used());
152 print_conn_bufstats(nb); */
154 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
156 if (nb->nbcongwin.cwin_shrinkto == cwin) {
157 cwin = max(min_cwin, cwin - cwin / 16);
158 atomic64_set(&nb->nbcongwin.cwin, cwin);
161 nb->nbcongwin.cwin_shrinkto = max(min_cwin, cwin - cwin / 16);
163 spin_unlock_irqrestore(&nb->nbcongwin.lock, iflags);
166 static __u64 cor_nbcongwin_update_cwin(struct cor_neighbor *nb_cwlocked,
167 __u64 data_intransit, __u64 bytes_acked)
169 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
170 __u32 INCR_PER_RTT = 8192;
172 __u64 cwin = atomic64_read(&nb_cwlocked->nbcongwin.cwin);
174 __u64 cwin_tmp;
175 __u64 incrby;
177 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
178 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT - 2));
180 BUILD_BUG_ON(NBCONGWIN_SHIFT < 2);
182 if (unlikely(shrinkby > cwin))
183 cwin = 0;
184 else
185 cwin -= shrinkby;
187 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
188 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
192 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
193 goto out;
195 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
197 if (unlikely(bytes_acked >= U64_MAX / INCR_PER_RTT / CWIN_MUL))
198 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
199 cwin_tmp / CWIN_MUL / CWIN_MUL);
200 else if (unlikely(bytes_acked >=
201 U64_MAX / INCR_PER_RTT / CWIN_MUL / CWIN_MUL))
202 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
203 cwin_tmp / CWIN_MUL);
204 else
205 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
206 CWIN_MUL, cwin_tmp);
208 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
210 if (unlikely(cwin + incrby < cwin))
211 cwin = U64_MAX;
212 else
213 cwin += incrby;
215 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
216 nb_cwlocked->nbcongwin.cwin_shrinkto))
217 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
218 else
219 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
221 out:
222 atomic64_set(&nb_cwlocked->nbcongwin.cwin, cwin);
224 return cwin;
227 void cor_nbcongwin_data_acked(struct cor_neighbor *nb, __u64 bytes_acked)
229 unsigned long iflags;
230 struct cor_qos_queue *q = nb->queue;
231 __u64 data_intransit;
232 __u64 cwin;
234 spin_lock_irqsave(&nb->nbcongwin.lock, iflags);
236 data_intransit = atomic64_read(&nb->nbcongwin.data_intransit);
238 cwin = cor_nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
240 BUG_ON(bytes_acked > data_intransit);
241 atomic64_sub(bytes_acked, &nb->nbcongwin.data_intransit);
242 data_intransit -= bytes_acked;
244 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
245 goto out_sendnok;
247 spin_lock(&q->qlock);
248 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
249 if (nb->conns_waiting.cnt == 0) {
250 nb->rb.in_queue = RB_INQUEUE_FALSE;
251 } else {
252 _cor_qos_enqueue(q, &nb->rb, 0, ns_to_ktime(0),
253 QOS_CALLER_NEIGHBOR, 1, 0);
256 spin_unlock(&q->qlock);
259 out_sendnok:
260 spin_unlock_irqrestore(&nb->nbcongwin.lock, iflags);
263 void cor_nbcongwin_data_sent(struct cor_neighbor *nb, __u32 bytes_sent)
265 atomic64_add(bytes_sent, &nb->nbcongwin.data_intransit);
268 int cor_nbcongwin_send_allowed(struct cor_neighbor *nb)
270 unsigned long iflags;
271 int ret = 1;
272 struct cor_qos_queue *q = nb->queue;
273 int krefput_queue = 0;
275 if (atomic64_read(&nb->nbcongwin.data_intransit) <=
276 atomic64_read(&nb->nbcongwin.cwin) >> NBCONGWIN_SHIFT)
277 return 1;
279 spin_lock_irqsave(&nb->nbcongwin.lock, iflags);
281 if (atomic64_read(&nb->nbcongwin.data_intransit) <=
282 atomic64_read(&nb->nbcongwin.cwin) >> NBCONGWIN_SHIFT)
283 goto out_ok;
285 ret = 0;
287 spin_lock(&q->qlock);
288 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
289 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
290 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
291 list_del(&nb->rb.lh);
292 cor_nb_kref_put_bug(nb, "qos_queue_nb");
293 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
294 BUG_ON(q->numconns < nb->conns_waiting.cnt);
295 q->numconns -= nb->conns_waiting.cnt;
296 q->priority_sum -= nb->conns_waiting.priority_sum;
297 krefput_queue = 1;
299 cor_qos_queue_set_congstatus(q);
300 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
301 } else {
302 BUG();
304 spin_unlock(&q->qlock);
306 if (krefput_queue != 0)
307 kref_put(&q->ref, cor_free_qos);
309 out_ok:
310 spin_unlock_irqrestore(&nb->nbcongwin.lock, iflags);
312 return ret;
315 #endif
317 static void _cor_resume_conns_accountbusytime(struct cor_conn *trgt_out_l,
318 __u32 priority, __u32 burstprio,
319 unsigned long jiffies_nb_lastduration)
321 unsigned long jiffies_tmp = jiffies;
322 __u64 jiffies_nb_lastduration_shifted = (jiffies_nb_lastduration <<
323 JIFFIES_LAST_IDLE_SHIFT);
324 __u64 burstfactor;
326 if (trgt_out_l->is_highlatency != 0) {
327 burstfactor = 2048;
328 } else {
329 BUG_ON(burstprio < priority);
330 burstfactor = div_u64(1024LL * (__u64) burstprio, priority) * 2 -
331 1024;
332 BUG_ON(burstfactor < 1024);
335 trgt_out_l->trgt.out.jiffies_idle_since +=
336 (jiffies_nb_lastduration_shifted * burstfactor) / 1024;
338 if (time_before(jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT,
339 trgt_out_l->trgt.out.jiffies_idle_since))
340 trgt_out_l->trgt.out.jiffies_idle_since =
341 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
344 static int _cor_resume_conns_nbnotactive(struct cor_neighbor *nb)
346 struct cor_qos_queue *q = nb->queue;
347 int krefput_queue = 0;
349 spin_lock(&q->qlock);
351 if (unlikely(cor_get_neigh_state(nb) == NEIGHBOR_STATE_ACTIVE)) {
352 spin_unlock(&q->qlock);
353 return 1;
356 if (likely(nb->rb.in_queue == RB_INQUEUE_TRUE)) {
357 list_del(&nb->rb.lh);
358 cor_nb_kref_put_bug(nb, "qos_queue_nb");
359 nb->rb.in_queue = RB_INQUEUE_NBNOTACTIVE;
360 BUG_ON(q->numconns < nb->conns_waiting.cnt);
361 q->numconns -= nb->conns_waiting.cnt;
362 q->priority_sum -= nb->conns_waiting.priority_sum;
363 krefput_queue = 1;
365 cor_qos_queue_set_congstatus(q);
368 spin_unlock(&q->qlock);
370 if (krefput_queue != 0)
371 kref_put(&q->ref, cor_free_qos);
373 return 0;
376 unsigned long cor_get_conn_idletime(struct cor_conn *trgt_out_lx)
378 unsigned long jiffies_shifted = jiffies << JIFFIES_LAST_IDLE_SHIFT;
379 __u32 burst_maxidle_secs;
380 __u32 burst_maxidle_hz_shifted;
381 unsigned long idletime_hz_shifted;
383 if (trgt_out_lx->is_highlatency != 0)
384 burst_maxidle_secs = BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS;
385 else
386 burst_maxidle_secs = BURSTPRIO_MAXIDLETIME_LOWLATENCY_SECS;
388 burst_maxidle_hz_shifted = (burst_maxidle_secs * HZ) <<
389 JIFFIES_LAST_IDLE_SHIFT;
391 idletime_hz_shifted = jiffies_shifted -
392 trgt_out_lx->trgt.out.jiffies_idle_since;
394 if (unlikely(idletime_hz_shifted > burst_maxidle_hz_shifted)) {
395 idletime_hz_shifted = burst_maxidle_hz_shifted;
396 trgt_out_lx->trgt.out.jiffies_idle_since = jiffies_shifted -
397 burst_maxidle_hz_shifted;
400 return idletime_hz_shifted;
403 static __u32 _cor_resume_conns_burstprio(struct cor_conn *trgt_out_l,
404 __u32 priority)
406 unsigned long idletime_hz_shifted = cor_get_conn_idletime(trgt_out_l);
407 __u32 idletime_msecs = jiffies_to_msecs(idletime_hz_shifted >>
408 JIFFIES_LAST_IDLE_SHIFT);
409 __u64 newprio;
411 if (trgt_out_l->is_highlatency != 0) {
413 * negative burst for high latency conns:
414 * 50% if idle
415 * 125% if busy
418 __u32 burstfactor;
420 BUG_ON(idletime_msecs >
421 BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS * 1000);
422 BUG_ON(BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS * 1000LL >
423 U32_MAX / 1024);
425 burstfactor = 768 - (768 * idletime_msecs) /
426 (BURSTPRIO_MAXIDLETIME_HIGHLATENCY_SECS * 1000);
428 newprio = (((__u64) priority) * (512 + burstfactor)) /
429 1024;
430 } else {
431 __u32 burstfactor;
433 BUG_ON(idletime_msecs >
434 BURSTPRIO_MAXIDLETIME_LOWLATENCY_SECS * 1000);
435 BUG_ON(BURSTPRIO_MAXIDLETIME_LOWLATENCY_SECS * 1000LL >
436 U32_MAX / 1024);
438 burstfactor = (1024 * idletime_msecs) /
439 (BURSTPRIO_MAXIDLETIME_LOWLATENCY_SECS * 1000);
441 newprio = (((__u64) priority) * (1024 + burstfactor * 2)) /
442 1024;
445 BUG_ON(newprio > U32_MAX);
446 return (__u32) newprio;
449 static __u32 _cor_resume_conns_maxsend(struct cor_qos_queue *q,
450 struct cor_conn *trgt_out_l, __u32 newpriority,
451 int *maxsend_forcedelay)
453 unsigned long iflags;
455 struct cor_neighbor *nb = trgt_out_l->trgt.out.nb;
456 __u32 oldpriority = trgt_out_l->trgt.out.rb_priority;
457 __u64 priority_sum;
458 __u32 numconns;
459 __u64 bytes_per_round;
460 __u64 ret;
462 spin_lock_irqsave(&nb->conns_waiting.lock, iflags);
463 spin_lock(&q->qlock);
465 if (unlikely(unlikely(trgt_out_l->trgt.out.rb.in_queue !=
466 RB_INQUEUE_TRUE) ||
467 unlikely(nb->rb.in_queue != RB_INQUEUE_TRUE))) {
468 spin_unlock(&q->qlock);
469 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
471 return 1024;
474 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
475 BUG_ON(q->priority_sum < oldpriority);
476 nb->conns_waiting.priority_sum -= oldpriority;
477 q->priority_sum -= oldpriority;
479 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
480 nb->conns_waiting.priority_sum);
481 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
482 nb->conns_waiting.priority_sum += newpriority;
483 q->priority_sum += newpriority;
485 priority_sum = q->priority_sum;
486 numconns = q->numconns;
488 spin_unlock(&q->qlock);
489 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
491 trgt_out_l->trgt.out.rb_priority = newpriority;
493 if (numconns <= 4) {
494 *maxsend_forcedelay = 1;
495 bytes_per_round = 2048;
496 } else {
497 *maxsend_forcedelay = 0;
498 bytes_per_round = 1024;
501 ret = div_u64(bytes_per_round * ((__u64) newpriority) *
502 ((__u64) numconns), priority_sum);
503 if (unlikely(ret > U32_MAX))
504 return U32_MAX;
505 return (__u32) ret;
508 static int _cor_resume_neighbors_nextpass(
509 struct cor_neighbor *nb_waitingconnslocked)
511 BUG_ON(list_empty(&nb_waitingconnslocked->conns_waiting.lh) == 0);
513 if (list_empty(&nb_waitingconnslocked->conns_waiting.lh_nextpass)) {
514 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
515 return 1;
518 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
520 cor_swap_list_items(&nb_waitingconnslocked->conns_waiting.lh,
521 &nb_waitingconnslocked->conns_waiting.lh_nextpass);
523 return 0;
526 static int _cor_resume_neighbors(struct cor_qos_queue *q,
527 struct cor_neighbor *nb, unsigned long jiffies_nb_lastduration,
528 int *progress)
530 unsigned long iflags;
532 while (1) {
533 __u32 priority;
534 __u32 burstprio;
535 __u32 maxsend;
536 int maxsend_forcedelay = 0;
538 int rc2;
539 __u32 sent2 = 0;
541 struct cor_conn *cn = 0;
543 spin_lock_irqsave(&nb->conns_waiting.lock, iflags);
544 if (list_empty(&nb->conns_waiting.lh) != 0) {
545 int done = _cor_resume_neighbors_nextpass(nb);
547 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
548 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
550 BUG_ON(nb->conns_waiting.cnt == 0);
552 cn = container_of(nb->conns_waiting.lh.next, struct cor_conn,
553 trgt.out.rb.lh);
554 BUG_ON(cn->targettype != TARGET_OUT);
555 BUG_ON(cn->trgt.out.rb.lh.prev != &nb->conns_waiting.lh);
556 BUG_ON((cn->trgt.out.rb.lh.next == &nb->conns_waiting.lh) &&
557 (nb->conns_waiting.lh.prev !=
558 &cn->trgt.out.rb.lh));
559 list_del(&cn->trgt.out.rb.lh);
560 list_add_tail(&cn->trgt.out.rb.lh,
561 &nb->conns_waiting.lh_nextpass);
562 cor_conn_kref_get(cn, "stack");
563 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
566 priority = cor_conn_refresh_priority(cn, 0);
568 spin_lock_bh(&cn->rcv_lock);
570 if (unlikely(cn->targettype != TARGET_OUT)) {
571 spin_unlock_bh(&cn->rcv_lock);
572 continue;
575 burstprio = _cor_resume_conns_burstprio(cn, priority);
577 maxsend = _cor_resume_conns_maxsend(q, cn, burstprio,
578 &maxsend_forcedelay);
579 if (cn->trgt.out.maxsend_extra >= maxsend)
580 maxsend_forcedelay = 0;
581 maxsend += cn->trgt.out.maxsend_extra;
582 if (unlikely(maxsend > U32_MAX))
583 maxsend = U32_MAX;
584 if (unlikely(maxsend >= 65536))
585 maxsend_forcedelay = 0;
587 retry:
588 rc2 = _cor_flush_out(cn, maxsend, &sent2, 1,
589 maxsend_forcedelay);
591 if (rc2 == RC_FLUSH_CONN_OUT_OK) {
592 cn->trgt.out.maxsend_extra = 0;
593 cor_qos_remove_conn(cn);
594 } else if (unlikely(rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE)) {
595 if (_cor_resume_conns_nbnotactive(nb) != 0)
596 goto retry;
597 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
598 unlikely(rc2 == RC_FLUSH_CONN_OUT_OOM))) {
599 spin_lock_irqsave(&nb->conns_waiting.lock, iflags);
600 if (likely(cn->trgt.out.rb.in_queue !=
601 RB_INQUEUE_FALSE)) {
602 list_del(&cn->trgt.out.rb.lh);
603 list_add(&cn->trgt.out.rb.lh,
604 &nb->conns_waiting.lh);
606 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
607 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
608 unlikely(rc2 == RC_FLUSH_CONN_OUT_OOM)) {
609 cn->trgt.out.maxsend_extra = 0;
610 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
611 if (unlikely(maxsend - sent2 > 65535))
612 cn->trgt.out.maxsend_extra = 65535;
613 else
614 cn->trgt.out.maxsend_extra = maxsend - sent2;
617 if (sent2 != 0)
618 _cor_resume_conns_accountbusytime(cn, priority,
619 burstprio, jiffies_nb_lastduration);
621 spin_unlock_bh(&cn->rcv_lock);
623 if (sent2 != 0) {
624 *progress = 1;
625 cor_wake_sender(cn);
628 cor_conn_kref_put(cn, "stack");
630 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
631 unlikely(rc2 == RC_FLUSH_CONN_OUT_OOM)) {
632 return QOS_RESUME_CONG;
633 } else if (unlikely(rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE)) {
634 return QOS_RESUME_NEXTNEIGHBOR;
639 static struct cor_neighbor *cor_resume_neighbors_peeknextnb(
640 struct cor_qos_queue *q, unsigned long *jiffies_nb_lastduration)
642 unsigned long iflags;
644 struct cor_neighbor *nb;
646 spin_lock_irqsave(&q->qlock, iflags);
648 if (list_empty(&q->neighbors_waiting)) {
649 if (list_empty(&q->neighbors_waiting_nextpass)) {
650 BUG_ON(q->numconns != 0);
651 spin_unlock_irqrestore(&q->qlock, iflags);
653 return 0;
654 } else {
655 unsigned long jiffies_tmp = jiffies;
657 cor_swap_list_items(&q->neighbors_waiting,
658 &q->neighbors_waiting_nextpass);
660 WARN_ONCE(time_before(jiffies_tmp,
661 q->jiffies_nb_pass_start),
662 "cor_resume_neighbors_peeknextnb: jiffies after jiffies_nb_pass_start (this is only a performance issue)");
664 q->jiffies_nb_lastduration = jiffies -
665 q->jiffies_nb_pass_start;
666 q->jiffies_nb_pass_start = jiffies_tmp;
670 *jiffies_nb_lastduration = q->jiffies_nb_lastduration;
673 BUG_ON(q->numconns == 0);
674 BUG_ON(list_empty(&q->neighbors_waiting));
676 nb = container_of(q->neighbors_waiting.next, struct cor_neighbor,
677 rb.lh);
679 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
680 BUG_ON(nb->rb.lh.prev != &q->neighbors_waiting);
681 BUG_ON((nb->rb.lh.next == &q->neighbors_waiting) &&
682 (q->neighbors_waiting.prev != &nb->rb.lh));
684 cor_nb_kref_get(nb, "stack");
686 spin_unlock_irqrestore(&q->qlock, iflags);
688 return nb;
691 static int cor_resume_neighbors(struct cor_qos_queue *q, int *sent)
693 unsigned long iflags;
694 int rc;
696 unsigned long jiffies_nb_lastduration;
697 struct cor_neighbor *nb = cor_resume_neighbors_peeknextnb(q,
698 &jiffies_nb_lastduration);
700 if (nb == 0)
701 return QOS_RESUME_DONE;
703 atomic_set(&nb->cmsg_delay_conndata, 1);
705 rc = _cor_resume_neighbors(q, nb, jiffies_nb_lastduration, sent);
706 if (rc == QOS_RESUME_CONG) {
707 cor_nb_kref_put(nb, "stack");
708 return QOS_RESUME_CONG;
710 BUG_ON(rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR);
712 atomic_set(&nb->cmsg_delay_conndata, 0);
713 spin_lock_bh(&nb->cmsg_lock);
714 cor_schedule_controlmsg_timer(nb);
715 spin_unlock_bh(&nb->cmsg_lock);
717 spin_lock_irqsave(&q->qlock, iflags);
718 if (likely(nb->rb.in_queue == RB_INQUEUE_TRUE)) {
719 if (nb->conns_waiting.cnt == 0) {
720 nb->rb.in_queue = RB_INQUEUE_FALSE;
721 list_del(&nb->rb.lh);
722 cor_nb_kref_put_bug(nb, "qos_queue_nb");
723 } else {
724 list_del(&nb->rb.lh);
725 list_add_tail(&nb->rb.lh,
726 &q->neighbors_waiting_nextpass);
729 spin_unlock_irqrestore(&q->qlock, iflags);
731 cor_nb_kref_put(nb, "stack");
733 return QOS_RESUME_NEXTNEIGHBOR;
736 static int __cor_qos_resume(struct cor_qos_queue *q, int caller, int *sent)
738 unsigned long iflags;
739 int rc = QOS_RESUME_DONE;
740 struct list_head *lh;
742 spin_lock_irqsave(&q->qlock, iflags);
744 if (caller == QOS_CALLER_KPACKET)
745 lh = &q->kpackets_waiting;
746 else if (caller == QOS_CALLER_CONN_RETRANS)
747 lh = &q->conn_retrans_waiting;
748 else if (caller == QOS_CALLER_ANNOUNCE)
749 lh = &q->announce_waiting;
750 else
751 BUG();
753 while (list_empty(lh) == 0) {
754 struct cor_resume_block *rb = container_of(lh->next,
755 struct cor_resume_block, lh);
757 unsigned long cmsg_send_start_j;
758 ktime_t cmsg_send_start_kt;
760 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
761 rb->in_queue = RB_INQUEUE_FALSE;
762 list_del(&rb->lh);
764 if (caller == QOS_CALLER_KPACKET) {
765 struct cor_neighbor *nb = container_of(rb,
766 struct cor_neighbor, rb_kp);
767 cmsg_send_start_j = nb->cmsg_send_start_j;
768 cmsg_send_start_kt = nb->cmsg_send_start_kt;
771 spin_unlock_irqrestore(&q->qlock, iflags);
772 if (caller == QOS_CALLER_KPACKET) {
773 rc = cor_send_messages(container_of(rb,
774 struct cor_neighbor, rb_kp),
775 cmsg_send_start_j, cmsg_send_start_kt,
776 sent);
777 } else if (caller == QOS_CALLER_CONN_RETRANS) {
778 rc = cor_send_retrans(container_of(rb,
779 struct cor_neighbor, rb_cr), sent);
780 } else if (caller == QOS_CALLER_ANNOUNCE) {
781 rc = _cor_send_announce(container_of(rb,
782 struct cor_announce_data, rb), 1, sent);
783 } else {
784 BUG();
786 spin_lock_irqsave(&q->qlock, iflags);
788 if (rc != QOS_RESUME_DONE && caller == QOS_CALLER_KPACKET) {
789 struct cor_neighbor *nb = container_of(rb,
790 struct cor_neighbor, rb_kp);
792 nb->cmsg_send_start_j = cmsg_send_start_j;
793 nb->cmsg_send_start_kt = cmsg_send_start_kt;
796 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
797 rb->in_queue = RB_INQUEUE_TRUE;
798 list_add(&rb->lh, lh);
799 break;
802 if (caller == QOS_CALLER_KPACKET) {
803 cor_nb_kref_put(container_of(rb, struct cor_neighbor,
804 rb_kp), "qos_queue_kpacket");
805 } else if (caller == QOS_CALLER_CONN_RETRANS) {
806 cor_nb_kref_put(container_of(rb, struct cor_neighbor,
807 rb_cr), "qos_queue_conn_retrans");
808 } else if (caller == QOS_CALLER_ANNOUNCE) {
809 kref_put(&container_of(rb, struct cor_announce_data,
810 rb)->ref, cor_announce_data_free);
811 } else {
812 BUG();
815 kref_put(&q->ref, cor_kreffree_bug);
818 spin_unlock_irqrestore(&q->qlock, iflags);
820 return rc;
823 static int _cor_qos_resume(struct cor_qos_queue *q, int *sent)
825 unsigned long iflags;
826 int i = QOS_CALLER_KPACKET;
827 int rc;
829 spin_lock_irqsave(&q->qlock, iflags);
831 while (1) {
832 if (q->dev == 0) {
833 rc = QOS_RESUME_EXIT;
834 break;
837 if (i == QOS_CALLER_KPACKET &&
838 list_empty(&q->kpackets_waiting)) {
839 i = QOS_CALLER_CONN_RETRANS;
840 continue;
841 } else if (i == QOS_CALLER_CONN_RETRANS &&
842 list_empty(&q->conn_retrans_waiting)) {
843 i = QOS_CALLER_ANNOUNCE;
844 continue;
845 } else if (i == QOS_CALLER_ANNOUNCE &&
846 list_empty(&q->announce_waiting)) {
847 i = QOS_CALLER_NEIGHBOR;
848 continue;
849 } else if (i == QOS_CALLER_NEIGHBOR &&
850 list_empty(&q->neighbors_waiting) &&
851 list_empty(&q->neighbors_waiting_nextpass)) {
852 rc = QOS_RESUME_DONE;
853 break;
856 spin_unlock_irqrestore(&q->qlock, iflags);
858 if (i == QOS_CALLER_NEIGHBOR)
859 rc = cor_resume_neighbors(q, sent);
860 else
861 rc = __cor_qos_resume(q, i, sent);
863 spin_lock_irqsave(&q->qlock, iflags);
865 if (rc == QOS_RESUME_CONG)
866 break;
868 i = QOS_CALLER_KPACKET;
871 if (rc == QOS_RESUME_DONE) {
872 BUG_ON(!list_empty(&q->kpackets_waiting));
873 BUG_ON(!list_empty(&q->conn_retrans_waiting));
874 BUG_ON(!list_empty(&q->announce_waiting));
875 BUG_ON(!list_empty(&q->neighbors_waiting));
876 BUG_ON(!list_empty(&q->neighbors_waiting_nextpass));
878 atomic_set(&q->qos_resume_scheduled, 0);
881 cor_qos_queue_set_congstatus(q);
883 if (q->dev == 0)
884 rc = QOS_RESUME_EXIT;
886 spin_unlock_irqrestore(&q->qlock, iflags);
888 return rc;
891 int cor_qos_resume_threadfunc(void *data)
893 struct cor_qos_queue *q = (struct cor_qos_queue *) data;
895 while (1) {
896 int sent = 0;
897 int rc;
899 rc = _cor_qos_resume(q, &sent);
901 if (rc == QOS_RESUME_DONE) {
902 wait_event(q->qos_resume_wq,
903 atomic_read(&q->qos_resume_scheduled)
904 != 0);
905 } else if (rc == QOS_RESUME_CONG) {
906 unsigned long jiffies_tmp = jiffies;
907 unsigned long delay_ms = 0;
909 if (sent)
910 q->jiffies_lastprogress = jiffies_tmp;
911 delay_ms = (jiffies_to_msecs(jiffies_tmp -
912 q->jiffies_lastprogress) + 8) / 4;
913 if (delay_ms < 2)
914 delay_ms = 2;
915 else if (delay_ms > 20)
916 delay_ms = 20;
918 msleep(delay_ms);
919 } else if (rc == QOS_RESUME_EXIT) {
920 return 0;
921 } else {
922 BUG();
927 static inline int cor_qos_queue_is_destroyed(struct cor_qos_queue *q_locked)
929 return q_locked->dev == 0;
932 struct cor_qos_queue *cor_get_queue(struct net_device *dev)
934 struct cor_qos_queue *ret = 0;
935 struct list_head *curr;
937 spin_lock_bh(&cor_queues_lock);
938 curr = cor_queues.next;
939 while (curr != (&cor_queues)) {
940 struct cor_qos_queue *q = container_of(curr,
941 struct cor_qos_queue, queue_list);
942 if (q->dev == dev) {
943 ret = q;
944 kref_get(&ret->ref);
945 break;
947 curr = curr->next;
949 spin_unlock_bh(&cor_queues_lock);
950 return ret;
953 static void cor_qos_waitexit(struct work_struct *work)
955 spin_lock_bh(&cor_queues_lock);
956 while (!list_empty(&cor_queues_waitexit)) {
957 struct cor_qos_queue *q = container_of(cor_queues_waitexit.next,
958 struct cor_qos_queue, queue_list);
959 list_del(&q->queue_list);
961 spin_unlock_bh(&cor_queues_lock);
963 kthread_stop(q->qos_resume_thread);
964 put_task_struct(q->qos_resume_thread);
965 kref_put(&q->ref, cor_free_qos);
967 spin_lock_bh(&cor_queues_lock);
969 spin_unlock_bh(&cor_queues_lock);
972 static void _cor_destroy_queue_kpackets(struct cor_qos_queue *q)
974 while (list_empty(&q->kpackets_waiting) == 0) {
975 struct list_head *curr = q->kpackets_waiting.next;
976 struct cor_resume_block *rb = container_of(curr,
977 struct cor_resume_block, lh);
978 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
979 rb->in_queue = RB_INQUEUE_FALSE;
980 list_del(curr);
982 cor_nb_kref_put(container_of(rb, struct cor_neighbor, rb_kp),
983 "qos_queue_kpacket");
984 kref_put(&q->ref, cor_kreffree_bug);
988 static void _cor_destroy_queue_conn_retrans(struct cor_qos_queue *q)
990 while (list_empty(&q->conn_retrans_waiting) == 0) {
991 struct list_head *curr = q->conn_retrans_waiting.next;
992 struct cor_resume_block *rb = container_of(curr,
993 struct cor_resume_block, lh);
994 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
995 rb->in_queue = RB_INQUEUE_FALSE;
996 list_del(curr);
998 cor_nb_kref_put(container_of(rb, struct cor_neighbor, rb_cr),
999 "qos_queue_conn_retrans");
1000 kref_put(&q->ref, cor_kreffree_bug);
1004 static void _cor_destroy_queue_announce(struct cor_qos_queue *q)
1006 while (list_empty(&q->announce_waiting) == 0) {
1007 struct list_head *curr = q->announce_waiting.next;
1008 struct cor_resume_block *rb = container_of(curr,
1009 struct cor_resume_block, lh);
1010 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
1011 rb->in_queue = RB_INQUEUE_FALSE;
1012 list_del(curr);
1014 kref_put(&container_of(rb, struct cor_announce_data, rb)->ref,
1015 cor_announce_data_free);
1016 kref_put(&q->ref, cor_kreffree_bug);
1020 static void _cor_destroy_queue_neighbor(struct cor_qos_queue *q,
1021 struct list_head *lh)
1023 while (list_empty(lh) == 0) {
1024 struct list_head *curr = lh->next;
1025 struct cor_resume_block *rb = container_of(curr,
1026 struct cor_resume_block, lh);
1027 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
1028 rb->in_queue = RB_INQUEUE_FALSE;
1029 list_del(curr);
1031 cor_nb_kref_put(container_of(rb, struct cor_neighbor, rb),
1032 "qos_queue_nb");
1033 kref_put(&q->ref, cor_kreffree_bug);
1037 static struct cor_qos_queue *cor_unlink_queue(struct net_device *dev)
1039 struct cor_qos_queue *ret = 0;
1040 struct list_head *curr;
1042 spin_lock_bh(&cor_queues_lock);
1043 curr = cor_queues.next;
1044 while (curr != (&cor_queues)) {
1045 struct cor_qos_queue *q = container_of(curr,
1046 struct cor_qos_queue, queue_list);
1047 if (dev == 0 || q->dev == dev) {
1048 ret = q;
1049 kref_get(&ret->ref);
1051 list_del(&q->queue_list);
1052 kref_put(&q->ref, cor_kreffree_bug);
1053 break;
1055 curr = curr->next;
1057 spin_unlock_bh(&cor_queues_lock);
1058 return ret;
1061 int cor_destroy_queue(struct net_device *dev)
1063 int rc = 1;
1064 unsigned long iflags;
1066 while (1) {
1067 struct cor_qos_queue *q = cor_unlink_queue(dev);
1069 if (q == 0)
1070 break;
1072 rc = 0;
1074 spin_lock_irqsave(&q->qlock, iflags);
1075 if (q->dev != 0) {
1076 dev_put(q->dev);
1077 q->dev = 0;
1079 _cor_destroy_queue_kpackets(q);
1080 _cor_destroy_queue_conn_retrans(q);
1081 _cor_destroy_queue_announce(q);
1082 _cor_destroy_queue_neighbor(q, &q->neighbors_waiting);
1083 _cor_destroy_queue_neighbor(q, &q->neighbors_waiting_nextpass);
1085 spin_unlock_irqrestore(&q->qlock, iflags);
1087 cor_schedule_qos_resume(q);
1089 spin_lock_bh(&cor_queues_lock);
1090 list_add(&q->queue_list, &cor_queues_waitexit);
1091 spin_unlock_bh(&cor_queues_lock);
1093 schedule_work(&cor_qos_waitexit_work);
1096 return rc;
1099 int cor_create_queue(struct net_device *dev)
1101 struct cor_qos_queue *q = kmalloc(sizeof(struct cor_qos_queue),
1102 GFP_KERNEL);
1104 if (q == 0) {
1105 printk(KERN_ERR "cor: unable to allocate memory for device queue, not enabling device\n");
1106 return 1;
1109 memset(q, 0, sizeof(struct cor_qos_queue));
1111 spin_lock_init(&q->qlock);
1113 kref_init(&q->ref);
1115 q->dev = dev;
1116 dev_hold(dev);
1118 atomic_set(&q->qos_resume_scheduled, 0);
1120 init_waitqueue_head(&q->qos_resume_wq);
1122 INIT_LIST_HEAD(&q->kpackets_waiting);
1123 INIT_LIST_HEAD(&q->conn_retrans_waiting);
1124 INIT_LIST_HEAD(&q->announce_waiting);
1125 INIT_LIST_HEAD(&q->neighbors_waiting);
1126 INIT_LIST_HEAD(&q->neighbors_waiting_nextpass);
1128 atomic_set(&q->cong_status, 0);
1130 q->qos_resume_thread = kthread_create(cor_qos_resume_threadfunc,
1131 q, "cor_qos_resume");
1132 if (q->qos_resume_thread == 0) {
1133 printk(KERN_ERR "cor: unable to start qos_resume thread\n");
1135 if (q->dev != 0) {
1136 dev_put(q->dev);
1137 q->dev = 0;
1140 kref_put(&q->ref, cor_free_qos);
1142 return 1;
1144 get_task_struct(q->qos_resume_thread);
1145 wake_up_process(q->qos_resume_thread);
1147 spin_lock_bh(&cor_queues_lock);
1148 list_add(&q->queue_list, &cor_queues);
1149 spin_unlock_bh(&cor_queues_lock);
1151 return 0;
1154 static void cor_qos_queue_set_congstatus(struct cor_qos_queue *q_locked)
1156 __u32 newstatus;
1158 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ / 50)) {
1159 newstatus = CONGSTATUS_NONE;
1160 } else if (list_empty(&q_locked->kpackets_waiting) == 0) {
1161 newstatus = CONGSTATUS_KPACKETS;
1162 } else if (list_empty(&q_locked->conn_retrans_waiting) == 0) {
1163 newstatus = CONGSTATUS_RETRANS;
1164 } else if (list_empty(&q_locked->announce_waiting) == 0) {
1165 newstatus = CONGSTATUS_ANNOUNCE;
1166 } else if (list_empty(&q_locked->neighbors_waiting) == 0 ||
1167 list_empty(&q_locked->neighbors_waiting_nextpass) ==
1168 0) {
1169 newstatus = CONGSTATUS_CONNDATA;
1170 } else {
1171 newstatus = CONGSTATUS_NONE;
1174 atomic_set(&q_locked->cong_status, newstatus);
1177 void cor_qos_set_lastdrop(struct cor_qos_queue *q)
1179 unsigned long iflags;
1181 spin_lock_irqsave(&q->qlock, iflags);
1182 q->jiffies_lastdrop = jiffies;
1183 cor_qos_queue_set_congstatus(q);
1184 spin_unlock_irqrestore(&q->qlock, iflags);
1188 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
1189 * caller
1191 static void _cor_qos_enqueue(struct cor_qos_queue *q,
1192 struct cor_resume_block *rb, unsigned long cmsg_send_start_j,
1193 ktime_t cmsg_send_start_kt, int caller,
1194 int from_nbcongwin_resume, int from_nbnotactive_resume)
1196 int queues_empty;
1198 if (rb->in_queue == RB_INQUEUE_TRUE) {
1199 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
1201 if (caller == QOS_CALLER_KPACKET) {
1202 struct cor_neighbor *nb = container_of(rb,
1203 struct cor_neighbor, rb_kp);
1204 if (time_before(cmsg_send_start_j,
1205 nb->cmsg_send_start_j))
1206 nb->cmsg_send_start_j = cmsg_send_start_j;
1207 if (ktime_before(cmsg_send_start_kt,
1208 nb->cmsg_send_start_kt))
1209 nb->cmsg_send_start_kt = cmsg_send_start_kt;
1211 return;
1212 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
1213 from_nbcongwin_resume == 0) {
1214 return;
1215 } else if (rb->in_queue == RB_INQUEUE_NBNOTACTIVE) {
1216 return;
1219 if (unlikely(cor_qos_queue_is_destroyed(q)))
1220 return;
1222 queues_empty = list_empty(&q->kpackets_waiting) &&
1223 list_empty(&q->conn_retrans_waiting) &&
1224 list_empty(&q->announce_waiting) &&
1225 list_empty(&q->neighbors_waiting) &&
1226 list_empty(&q->neighbors_waiting_nextpass);
1228 BUG_ON(!queues_empty && atomic_read(&q->qos_resume_scheduled) == 0);
1230 if (caller == QOS_CALLER_KPACKET) {
1231 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
1232 rb_kp);
1233 nb->cmsg_send_start_j = cmsg_send_start_j;
1234 nb->cmsg_send_start_kt = cmsg_send_start_kt;
1235 list_add_tail(&rb->lh, &q->kpackets_waiting);
1236 cor_nb_kref_get(nb, "qos_queue_kpacket");
1237 } else if (caller == QOS_CALLER_CONN_RETRANS) {
1238 list_add_tail(&rb->lh, &q->conn_retrans_waiting);
1239 cor_nb_kref_get(container_of(rb, struct cor_neighbor, rb_cr),
1240 "qos_queue_conn_retrans");
1241 } else if (caller == QOS_CALLER_ANNOUNCE) {
1242 list_add_tail(&rb->lh, &q->announce_waiting);
1243 kref_get(&container_of(rb, struct cor_announce_data, rb)->ref);
1244 } else if (caller == QOS_CALLER_NEIGHBOR) {
1245 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
1246 rb);
1247 if (unlikely(nb->conns_waiting.cnt == 0))
1248 return;
1250 list_add_tail(&rb->lh, &q->neighbors_waiting_nextpass);
1251 cor_nb_kref_get(nb, "qos_queue_nb");
1252 q->numconns += nb->conns_waiting.cnt;
1253 q->priority_sum += nb->conns_waiting.priority_sum;
1254 q->jiffies_nb_lastduration = 0;
1255 q->jiffies_nb_pass_start = jiffies;
1256 } else {
1257 BUG();
1259 rb->in_queue = RB_INQUEUE_TRUE;
1260 kref_get(&q->ref);
1262 cor_schedule_qos_resume(q);
1264 cor_qos_queue_set_congstatus(q);
1267 void cor_qos_enqueue(struct cor_qos_queue *q, struct cor_resume_block *rb,
1268 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
1269 int caller, int from_nbnotactive_resume)
1271 unsigned long iflags;
1273 spin_lock_irqsave(&q->qlock, iflags);
1274 _cor_qos_enqueue(q, rb, cmsg_send_start_j, cmsg_send_start_kt,
1275 caller, 0, from_nbnotactive_resume);
1276 spin_unlock_irqrestore(&q->qlock, iflags);
1279 void cor_qos_remove_conn(struct cor_conn *trgt_out_lx)
1281 unsigned long iflags;
1282 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
1283 struct cor_qos_queue *q = nb->queue;
1284 int sched_cmsg = 0;
1285 int krefput_nb = 0;
1287 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1288 BUG_ON(q == 0);
1290 spin_lock_irqsave(&nb->conns_waiting.lock, iflags);
1291 if (trgt_out_lx->trgt.out.rb.in_queue == RB_INQUEUE_FALSE) {
1292 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
1293 return;
1295 spin_lock(&q->qlock);
1297 trgt_out_lx->trgt.out.rb.in_queue = RB_INQUEUE_FALSE;
1298 list_del(&trgt_out_lx->trgt.out.rb.lh);
1299 BUG_ON(nb->conns_waiting.cnt == 0);
1300 nb->conns_waiting.cnt--;
1301 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1302 BUG_ON(q->numconns == 0);
1303 q->numconns--;
1306 BUG_ON(nb->conns_waiting.priority_sum <
1307 trgt_out_lx->trgt.out.rb_priority);
1308 BUG_ON(q->priority_sum < trgt_out_lx->trgt.out.rb_priority);
1309 nb->conns_waiting.priority_sum -=
1310 trgt_out_lx->trgt.out.rb_priority;
1311 q->priority_sum -= trgt_out_lx->trgt.out.rb_priority;
1312 trgt_out_lx->trgt.out.rb_priority = 0;
1314 if (list_empty(&nb->conns_waiting.lh) &&
1315 list_empty(&nb->conns_waiting.lh_nextpass)) {
1316 BUG_ON(nb->conns_waiting.priority_sum != 0);
1317 BUG_ON(nb->conns_waiting.cnt != 0);
1318 } else {
1319 BUG_ON(nb->conns_waiting.cnt == 0);
1322 if (list_empty(&nb->conns_waiting.lh) &&
1323 list_empty(&nb->conns_waiting.lh_nextpass) &&
1324 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1325 nb->rb.in_queue = RB_INQUEUE_FALSE;
1326 list_del(&nb->rb.lh);
1327 if (atomic_read(&nb->cmsg_delay_conndata) != 0) {
1328 atomic_set(&nb->cmsg_delay_conndata, 0);
1329 sched_cmsg = 1;
1331 krefput_nb = 1;
1333 BUG_ON(list_empty(&q->neighbors_waiting) &&
1334 list_empty(&q->neighbors_waiting_nextpass) &&
1335 q->numconns != 0);
1336 BUG_ON(list_empty(&q->neighbors_waiting) &&
1337 list_empty(&q->neighbors_waiting_nextpass) &&
1338 q->priority_sum != 0);
1340 cor_qos_queue_set_congstatus(q);
1343 spin_unlock(&q->qlock);
1344 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
1346 if (sched_cmsg) {
1347 spin_lock_bh(&nb->cmsg_lock);
1348 cor_schedule_controlmsg_timer(nb);
1349 spin_unlock_bh(&nb->cmsg_lock);
1352 cor_conn_kref_put_bug(trgt_out_lx, "qos_queue");
1354 if (krefput_nb)
1355 cor_nb_kref_put(nb, "qos_queue_nb");
1358 void cor_qos_enqueue_conn(struct cor_conn *trgt_out_lx)
1360 unsigned long iflags;
1361 struct cor_neighbor *nb = trgt_out_lx->trgt.out.nb;
1362 struct cor_qos_queue *q;
1364 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1366 spin_lock_irqsave(&nb->conns_waiting.lock, iflags);
1368 if (trgt_out_lx->trgt.out.rb.in_queue != RB_INQUEUE_FALSE)
1369 goto out;
1371 trgt_out_lx->trgt.out.rb.in_queue = RB_INQUEUE_TRUE;
1372 list_add_tail(&trgt_out_lx->trgt.out.rb.lh,
1373 &nb->conns_waiting.lh);
1374 cor_conn_kref_get(trgt_out_lx, "qos_queue");
1375 nb->conns_waiting.cnt++;
1377 q = trgt_out_lx->trgt.out.nb->queue;
1378 spin_lock(&q->qlock);
1379 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1380 q->numconns++;
1381 } else {
1382 _cor_qos_enqueue(q, &nb->rb, 0, ns_to_ktime(0),
1383 QOS_CALLER_NEIGHBOR, 0, 0);
1385 spin_unlock(&q->qlock);
1387 out:
1388 spin_unlock_irqrestore(&nb->conns_waiting.lock, iflags);
1391 struct sk_buff *cor_create_packet(struct cor_neighbor *nb, int size,
1392 gfp_t alloc_flags)
1394 struct sk_buff *ret;
1396 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1397 nb->dev->needed_tailroom, alloc_flags);
1398 if (unlikely(ret == 0))
1399 return 0;
1401 ret->protocol = htons(ETH_P_COR);
1402 ret->dev = nb->dev;
1404 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1405 if (unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1406 nb->dev->dev_addr, ret->len) < 0))
1407 return 0;
1408 skb_reset_network_header(ret);
1410 return ret;
1413 struct sk_buff *cor_create_packet_conndata(struct cor_neighbor *nb, int size,
1414 gfp_t alloc_flags, __u32 conn_id, __u32 seqno,
1415 __u8 windowused, __u8 flush)
1417 struct sk_buff *ret;
1418 char *dest;
1420 ret = cor_create_packet(nb, size + 9, alloc_flags);
1421 if (unlikely(ret == 0))
1422 return 0;
1424 dest = skb_put(ret, 9);
1425 BUG_ON(dest == 0);
1427 BUG_ON((windowused & (~PACKET_TYPE_CONNDATA_FLAGS_WINDOWUSED)) != 0);
1429 dest[0] = PACKET_TYPE_CONNDATA |
1430 (flush == 0 ? 0 : PACKET_TYPE_CONNDATA_FLAGS_FLUSH) |
1431 windowused;
1432 dest += 1;
1434 cor_put_u32(dest, conn_id);
1435 dest += 4;
1436 cor_put_u32(dest, seqno);
1437 dest += 4;
1439 return ret;
1443 static void cor_rcv_conndata(struct sk_buff *skb, __u8 windowused, __u8 flush)
1445 struct cor_neighbor *nb = cor_get_neigh_by_mac(skb);
1447 __u32 conn_id;
1448 __u32 seqno;
1450 char *connid_p;
1451 char *seqno_p;
1453 /* __u8 rand; */
1455 if (unlikely(nb == 0))
1456 goto drop;
1458 connid_p = cor_pull_skb(skb, 4);
1459 if (unlikely(connid_p == 0))
1460 goto drop;
1462 seqno_p = cor_pull_skb(skb, 4);
1463 if (unlikely(seqno_p == 0))
1464 goto drop;
1466 conn_id = cor_parse_u32(connid_p);
1467 seqno = cor_parse_u32(seqno_p);
1469 /* get_random_bytes(&rand, 1);
1470 if (rand < 64)
1471 goto drop; */
1473 if (unlikely(skb->len <= 0))
1474 goto drop;
1476 cor_conn_rcv(nb, skb, 0, 0, conn_id, seqno, windowused, flush);
1478 if (0) {
1479 drop:
1480 kfree_skb(skb);
1483 if (nb != 0) {
1484 cor_nb_kref_put(nb, "stack");
1488 static void cor_rcv_cmsg(struct sk_buff *skb, int ackneeded)
1490 struct cor_neighbor *nb = cor_get_neigh_by_mac(skb);
1492 if (unlikely(nb == 0)) {
1493 kfree_skb(skb);
1494 } else {
1495 cor_kernel_packet(nb, skb, ackneeded);
1496 cor_nb_kref_put(nb, "stack");
1500 static int cor_rcv(struct sk_buff *skb, struct net_device *dev,
1501 struct packet_type *pt, struct net_device *orig_dev)
1503 __u8 packet_type;
1504 char *packet_type_p;
1506 if (skb->pkt_type == PACKET_OTHERHOST ||
1507 unlikely(skb->pkt_type == PACKET_LOOPBACK))
1508 goto drop;
1510 packet_type_p = cor_pull_skb(skb, 1);
1512 if (unlikely(packet_type_p == 0))
1513 goto drop;
1515 packet_type = *packet_type_p;
1517 if (unlikely(packet_type == PACKET_TYPE_ANNOUNCE)) {
1518 cor_rcv_announce(skb);
1519 return NET_RX_SUCCESS;
1520 } else if (packet_type == PACKET_TYPE_CMSG_NOACK) {
1521 cor_rcv_cmsg(skb, ACK_NEEDED_NO);
1522 return NET_RX_SUCCESS;
1523 } else if (packet_type == PACKET_TYPE_CMSG_ACKSLOW) {
1524 cor_rcv_cmsg(skb, ACK_NEEDED_SLOW);
1525 return NET_RX_SUCCESS;
1526 } else if (packet_type == PACKET_TYPE_CMSG_ACKFAST) {
1527 cor_rcv_cmsg(skb, ACK_NEEDED_FAST);
1528 return NET_RX_SUCCESS;
1529 } else if (likely((packet_type & (~PACKET_TYPE_CONNDATA_FLAGS)) ==
1530 PACKET_TYPE_CONNDATA)) {
1531 __u8 flush = 0;
1532 __u8 windowused;
1534 if ((packet_type & PACKET_TYPE_CONNDATA_FLAGS_FLUSH) != 0)
1535 flush = 1;
1536 windowused = (packet_type &
1537 PACKET_TYPE_CONNDATA_FLAGS_WINDOWUSED);
1538 cor_rcv_conndata(skb, windowused, flush);
1539 return NET_RX_SUCCESS;
1540 } else {
1541 kfree_skb(skb);
1542 return NET_RX_SUCCESS;
1545 drop:
1546 kfree_skb(skb);
1547 return NET_RX_DROP;
1550 int cor_netdev_notify_func(struct notifier_block *not, unsigned long event,
1551 void *ptr)
1553 struct net_device *dev = netdev_notifier_info_to_dev(ptr);
1554 int rc;
1556 BUG_ON(dev == 0);
1558 switch (event) {
1559 case NETDEV_UP:
1560 if (dev->flags & IFF_LOOPBACK)
1561 break;
1563 rc = cor_create_queue(dev);
1564 if (rc == 1)
1565 return 1;
1566 if (cor_is_clientmode() == 0)
1567 cor_announce_send_start(dev, dev->broadcast,
1568 ANNOUNCE_TYPE_BROADCAST);
1569 break;
1570 case NETDEV_DOWN:
1571 printk(KERN_ERR "down 1\n");
1572 udelay(100);
1573 printk(KERN_ERR "down 2\n");
1574 udelay(100);
1575 cor_announce_send_stop(dev, 0, ANNOUNCE_TYPE_BROADCAST);
1576 printk(KERN_ERR "down 3\n");
1577 udelay(100);
1578 cor_reset_neighbors(dev);
1579 printk(KERN_ERR "down 4\n");
1580 udelay(100);
1581 cor_destroy_queue(dev);
1582 printk(KERN_ERR "down 5\n");
1583 udelay(100);
1584 break;
1585 case NETDEV_CHANGEMTU:
1586 cor_resend_rcvmtu(dev);
1587 break;
1588 case NETDEV_REBOOT:
1589 case NETDEV_CHANGE:
1590 case NETDEV_REGISTER:
1591 case NETDEV_UNREGISTER:
1592 case NETDEV_CHANGEADDR:
1593 case NETDEV_GOING_DOWN:
1594 case NETDEV_CHANGENAME:
1595 case NETDEV_FEAT_CHANGE:
1596 case NETDEV_BONDING_FAILOVER:
1597 break;
1598 default:
1599 return 1;
1602 return 0;
1605 static struct packet_type cor_ptype = {
1606 .type = htons(ETH_P_COR),
1607 .dev = 0,
1608 .func = cor_rcv
1611 void cor_dev_down(void)
1613 if (cor_pack_registered != 0) {
1614 cor_pack_registered = 0;
1615 dev_remove_pack(&cor_ptype);
1618 if (cor_netdev_notify_registered != 0) {
1619 if (unregister_netdevice_notifier(&cor_netdev_notify) != 0) {
1620 printk(KERN_WARNING "warning: cor_dev_down: unregister_netdevice_notifier failed\n");
1621 BUG();
1623 cor_netdev_notify_registered = 0;
1627 int cor_dev_up(void)
1629 BUG_ON(cor_netdev_notify_registered != 0);
1630 if (register_netdevice_notifier(&cor_netdev_notify) != 0)
1631 return 1;
1632 cor_netdev_notify_registered = 1;
1634 BUG_ON(cor_pack_registered != 0);
1635 dev_add_pack(&cor_ptype);
1636 cor_pack_registered = 1;
1638 return 0;
1641 int __init cor_dev_init(void)
1643 memset(&cor_netdev_notify, 0, sizeof(cor_netdev_notify));
1644 cor_netdev_notify.notifier_call = cor_netdev_notify_func;
1646 return 0;
1649 void __exit cor_dev_exit1(void)
1651 flush_work(&cor_qos_waitexit_work);
1654 MODULE_LICENSE("GPL");