send ack needed
[cor.git] / net / cor / dev.c
blob05bc01f60cba27c85c87f9b45ef38e2ef0853bb8
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/version.h>
22 #include <linux/kernel.h>
23 #include <linux/init.h>
24 #include <linux/in.h>
25 #include <linux/kthread.h>
28 #include "cor.h"
30 static struct notifier_block cor_netdev_notify;
31 __u8 cor_netdev_notify_registered = 0;
33 __u8 cor_pack_registered = 0;
35 static DEFINE_SPINLOCK(cor_queues_lock);
36 static LIST_HEAD(cor_queues);
37 static LIST_HEAD(cor_queues_waitexit);
39 static void cor_qos_waitexit(struct work_struct *work);
40 DECLARE_WORK(cor_qos_waitexit_work, cor_qos_waitexit);
43 static void _cor_qos_enqueue(struct cor_qos_queue *q,
44 struct cor_resume_block *rb, unsigned long cmsg_send_start_j,
45 ktime_t cmsg_send_start_kt,
46 int caller, int from_nbcongwin_resume);
49 #ifdef DEBUG_QOS_SLOWSEND
50 static DEFINE_SPINLOCK(slowsend_lock);
51 static unsigned long cor_last_send;
54 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
56 int allowsend = 0;
57 unsigned long jiffies_tmp;
58 spin_lock_bh(&slowsend_lock);
59 jiffies_tmp = jiffies;
60 if (cor_last_send != jiffies_tmp) {
61 if (cor_last_send + 1 == jiffies_tmp) {
62 cor_last_send = jiffies_tmp;
63 } else {
64 cor_last_send = jiffies_tmp - 1;
66 allowsend = 1;
68 spin_unlock_bh(&slowsend_lock);
70 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d", caller, allowsend); */
71 if (allowsend) {
72 return dev_queue_xmit(skb);
73 } else {
74 kfree_skb(skb);
75 return NET_XMIT_DROP;
78 #endif
80 void cor_free_qos(struct kref *ref)
82 struct cor_qos_queue *q = container_of(ref, struct cor_qos_queue, ref);
83 kfree(q);
87 static void cor_qos_queue_set_congstatus(struct cor_qos_queue *q_locked);
89 /**
90 * neighbor congestion window:
91 * increment by 4096 every round trip if more that 2/3 of cwin is used
93 * in case of packet loss decrease by 1/4:
94 * - <= 1/8 immediately and
95 * - <= 1/4 during the next round trip
97 * in case of multiple packet loss events, do not decrement more than once per
98 * round trip
101 #ifdef COR_NBCONGWIN
103 /*extern __u64 get_bufspace_used(void);
105 static void print_conn_bufstats(struct cor_neighbor *nb)
107 / * not threadsafe, but this is only for debugging... * /
108 __u64 totalsize = 0;
109 __u64 read_remaining = 0;
110 __u32 numconns = 0;
111 struct list_head *lh;
112 unsigned long iflags;
114 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
116 lh = nb->conns_waiting.lh.next;
117 while (lh != &(nb->conns_waiting.lh)) {
118 struct cor_conn *cn = container_of(lh, struct cor_conn,
119 target.out.rb.lh);
120 totalsize += cn->data_buf.datasize;
121 read_remaining += cn->data_buf.read_remaining;
122 lh = lh->next;
125 lh = nb->conns_waiting.lh_nextpass.next;
126 while (lh != &(nb->conns_waiting.lh_nextpass)) {
127 struct cor_conn *cn = container_of(lh, struct cor_conn,
128 target.out.rb.lh);
129 totalsize += cn->data_buf.datasize;
130 read_remaining += cn->data_buf.read_remaining;
131 lh = lh->next;
134 numconns = nb->conns_waiting.cnt;
136 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
138 printk(KERN_ERR "conn %llu %llu %u", totalsize, read_remaining,
139 numconns);
140 } */
142 void cor_nbcongwin_data_retransmitted(struct cor_neighbor *nb,
143 __u64 bytes_sent)
145 __u64 min_cwin = cor_mss_conndata(nb, 0)*2 << NBCONGWIN_SHIFT;
146 __u64 cwin;
148 unsigned long iflags;
150 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
152 cwin = atomic64_read(&(nb->nbcongwin.cwin));
154 /* printk(KERN_ERR "retrans %llu %llu", cwin >> NBCONGWIN_SHIFT,
155 get_bufspace_used());
156 print_conn_bufstats(nb); */
158 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
160 if (nb->nbcongwin.cwin_shrinkto == cwin) {
161 cwin = max(min_cwin, cwin - cwin/16);
162 atomic64_set(&(nb->nbcongwin.cwin), cwin);
165 nb->nbcongwin.cwin_shrinkto = max(min_cwin, cwin - cwin/16);
167 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
170 static __u64 cor_nbcongwin_update_cwin(struct cor_neighbor *nb_cwlocked,
171 __u64 data_intransit, __u64 bytes_acked)
173 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
174 __u32 INCR_PER_RTT = 8192;
176 __u64 cwin = atomic64_read(&(nb_cwlocked->nbcongwin.cwin));
178 __u64 cwin_tmp;
179 __u64 incrby;
181 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
182 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT-2));
183 if (unlikely(shrinkby > cwin))
184 cwin = 0;
185 else
186 cwin -= shrinkby;
188 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
189 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
193 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
194 goto out;
196 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
198 if (unlikely(bytes_acked >= U64_MAX/INCR_PER_RTT/CWIN_MUL))
199 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
200 cwin_tmp / CWIN_MUL / CWIN_MUL);
201 else if (unlikely(bytes_acked >=
202 U64_MAX/INCR_PER_RTT/CWIN_MUL/CWIN_MUL))
203 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
204 cwin_tmp / CWIN_MUL);
205 else
206 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
207 CWIN_MUL, cwin_tmp);
209 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
211 if (unlikely(cwin + incrby < cwin))
212 cwin = U64_MAX;
213 else
214 cwin += incrby;
216 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
217 nb_cwlocked->nbcongwin.cwin_shrinkto))
218 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
219 else
220 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
222 out:
223 atomic64_set(&(nb_cwlocked->nbcongwin.cwin), cwin);
225 return cwin;
228 void cor_nbcongwin_data_acked(struct cor_neighbor *nb, __u64 bytes_acked)
230 unsigned long iflags;
231 struct cor_qos_queue *q = nb->queue;
232 __u64 data_intransit;
233 __u64 cwin;
235 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
237 data_intransit = atomic64_read(&(nb->nbcongwin.data_intransit));
239 cwin = cor_nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
241 BUG_ON(bytes_acked > data_intransit);
242 atomic64_sub(bytes_acked, &(nb->nbcongwin.data_intransit));
243 data_intransit -= bytes_acked;
245 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
246 goto out_sendnok;
248 spin_lock(&(q->qlock));
249 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
250 if (nb->conns_waiting.cnt == 0) {
251 nb->rb.in_queue = RB_INQUEUE_FALSE;
252 } else {
253 _cor_qos_enqueue(q, &(nb->rb), 0, ns_to_ktime(0),
254 QOS_CALLER_NEIGHBOR, 1);
257 spin_unlock(&(q->qlock));
260 out_sendnok:
261 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
264 void cor_nbcongwin_data_sent(struct cor_neighbor *nb, __u32 bytes_sent)
266 atomic64_add(bytes_sent, &(nb->nbcongwin.data_intransit));
269 int cor_nbcongwin_send_allowed(struct cor_neighbor *nb)
271 unsigned long iflags;
272 int ret = 1;
273 struct cor_qos_queue *q = nb->queue;
274 int krefput_queue = 0;
276 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
277 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
278 return 1;
280 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
282 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
283 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
284 goto out_ok;
286 ret = 0;
288 spin_lock(&(q->qlock));
289 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
290 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
291 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
292 list_del(&(nb->rb.lh));
293 kref_put(&(nb->ref), cor_kreffree_bug);
294 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
295 BUG_ON(q->numconns < nb->conns_waiting.cnt);
296 q->numconns -= nb->conns_waiting.cnt;
297 q->priority_sum -= nb->conns_waiting.priority_sum;
298 krefput_queue = 1;
300 cor_qos_queue_set_congstatus(q);
301 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
302 } else {
303 BUG();
305 spin_unlock(&(q->qlock));
307 if (krefput_queue != 0)
308 kref_put(&(q->ref), cor_free_qos);
310 out_ok:
311 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
313 return ret;
316 #endif
318 static void _cor_resume_conns_accountbusytime(struct cor_conn *trgt_out_l,
319 __u32 priority, __u32 burstprio,
320 unsigned long jiffies_nb_lastduration)
323 unsigned long jiffies_tmp = jiffies;
324 __u64 jiffies_last_idle_mul = (1LL << JIFFIES_LAST_IDLE_SHIFT);
325 __u64 burstfactor;
326 __u64 jiffies_shifted_busy;
328 BUG_ON(burstprio < priority);
330 burstfactor = div_u64(1024LL * (__u64) burstprio, priority);
331 BUG_ON(burstfactor < 1024);
332 burstfactor = 1024 + (burstfactor - 1024) * 2;
334 jiffies_shifted_busy = (jiffies_nb_lastduration * burstfactor *
335 jiffies_last_idle_mul) / 1024;
337 BUG_ON(BURSTPRIO_MAXIDLETIME_SECS >
338 (1 << 30) / (HZ*jiffies_last_idle_mul));
340 if (unlikely(jiffies_shifted_busy > HZ * BURSTPRIO_MAXIDLETIME_SECS *
341 jiffies_last_idle_mul))
342 trgt_out_l->target.out.jiffies_idle_since =
343 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
344 else
345 trgt_out_l->target.out.jiffies_idle_since +=
346 jiffies_shifted_busy;
348 if (unlikely(time_before(jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT,
349 trgt_out_l->target.out.jiffies_idle_since)))
350 trgt_out_l->target.out.jiffies_idle_since =
351 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
354 unsigned long cor_get_conn_idletime(struct cor_conn *trgt_out_l)
356 unsigned long jiffies_shifted = jiffies << JIFFIES_LAST_IDLE_SHIFT;
357 __u32 burst_maxidle_hz_shifted = (BURSTPRIO_MAXIDLETIME_SECS*HZ) <<
358 JIFFIES_LAST_IDLE_SHIFT;
359 unsigned long idletime_hz_shifted;
361 if (unlikely(time_before(jiffies_shifted,
362 trgt_out_l->target.out.jiffies_idle_since))) {
363 idletime_hz_shifted = 0;
364 trgt_out_l->target.out.jiffies_idle_since = jiffies_shifted -
365 burst_maxidle_hz_shifted;
366 } else {
367 idletime_hz_shifted = jiffies_shifted -
368 trgt_out_l->target.out.jiffies_idle_since;
370 if (unlikely(idletime_hz_shifted > burst_maxidle_hz_shifted)) {
371 idletime_hz_shifted = burst_maxidle_hz_shifted;
372 trgt_out_l->target.out.jiffies_idle_since =
373 jiffies_shifted -
374 burst_maxidle_hz_shifted;
378 return idletime_hz_shifted;
381 static __u32 _cor_resume_conns_burstprio(struct cor_conn *trgt_out_l,
382 __u32 priority)
384 unsigned long idletime_hz_shifted = cor_get_conn_idletime(trgt_out_l);
385 __u32 idletime_msecs = jiffies_to_msecs(idletime_hz_shifted >>
386 JIFFIES_LAST_IDLE_SHIFT);
387 __u32 burstfactor;
388 __u64 newprio;
390 BUG_ON(idletime_msecs > BURSTPRIO_MAXIDLETIME_SECS*1000);
391 BUG_ON(BURSTPRIO_MAXIDLETIME_SECS*1000LL > U32_MAX / 1024);
393 burstfactor = (1024 * idletime_msecs) /
394 (BURSTPRIO_MAXIDLETIME_SECS * 1000);
396 if (trgt_out_l->is_highlatency != 0)
397 newprio = (((__u64) priority) * (1024 + 1 * burstfactor)) /
398 1024;
399 else
400 newprio = (((__u64) priority) * (1024 + 2 * burstfactor)) /
401 1024;
403 BUG_ON(newprio > U32_MAX);
404 return (__u32) newprio;
407 static __u64 _cor_resume_conns_maxsend(struct cor_qos_queue *q,
408 struct cor_conn *trgt_out_l, __u32 newpriority,
409 int *maxsend_forcedelay)
411 unsigned long iflags;
413 struct cor_neighbor *nb = trgt_out_l->target.out.nb;
414 __u32 oldpriority = trgt_out_l->target.out.rb_priority;
415 __u64 priority_sum;
416 __u32 numconns;
417 __u64 bytes_per_round;
419 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
420 spin_lock(&(q->qlock));
422 if (unlikely(unlikely(trgt_out_l->target.out.rb.in_queue !=
423 RB_INQUEUE_TRUE) ||
424 unlikely(nb->rb.in_queue != RB_INQUEUE_TRUE))) {
425 spin_unlock(&(q->qlock));
426 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
428 return 1024LL;
431 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
432 BUG_ON(q->priority_sum < oldpriority);
433 nb->conns_waiting.priority_sum -= oldpriority;
434 q->priority_sum -= oldpriority;
436 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
437 nb->conns_waiting.priority_sum);
438 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
439 nb->conns_waiting.priority_sum += newpriority;
440 q->priority_sum += newpriority;
442 priority_sum = q->priority_sum;
443 numconns = q->numconns;
445 spin_unlock(&(q->qlock));
446 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
448 trgt_out_l->target.out.rb_priority = newpriority;
450 if (numconns <= 4) {
451 *maxsend_forcedelay = 1;
452 bytes_per_round = 2048LL;
453 } else {
454 *maxsend_forcedelay = 0;
455 bytes_per_round = 1024LL;
458 if (trgt_out_l->is_highlatency != 0)
459 bytes_per_round += bytes_per_round/8;
461 return div_u64(bytes_per_round * ((__u64) newpriority) *
462 ((__u64) numconns), priority_sum);
465 static int _cor_resume_neighbors_nextpass(
466 struct cor_neighbor *nb_waitingconnslocked)
468 BUG_ON(list_empty(&(nb_waitingconnslocked->conns_waiting.lh)) == 0);
470 if (list_empty(&(nb_waitingconnslocked->conns_waiting.lh_nextpass))) {
471 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
472 return 1;
475 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
477 cor_swap_list_items(&(nb_waitingconnslocked->conns_waiting.lh),
478 &(nb_waitingconnslocked->conns_waiting.lh_nextpass));
480 return 0;
483 static int _cor_resume_neighbors(struct cor_qos_queue *q,
484 struct cor_neighbor *nb, unsigned long jiffies_nb_lastduration,
485 int *progress)
487 unsigned long iflags;
489 while (1) {
490 __u32 priority;
491 __u32 burstprio;
492 __u32 maxsend;
493 int maxsend_forcedelay = 0;
495 int rc2;
496 __u32 sent2 = 0;
498 struct cor_conn *cn = 0;
499 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
500 if (list_empty(&(nb->conns_waiting.lh)) != 0) {
501 int done = _cor_resume_neighbors_nextpass(nb);
502 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
503 iflags);
504 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
506 BUG_ON(nb->conns_waiting.cnt == 0);
508 cn = container_of(nb->conns_waiting.lh.next, struct cor_conn,
509 target.out.rb.lh);
510 BUG_ON(cn->targettype != TARGET_OUT);
511 BUG_ON(cn->target.out.rb.lh.prev != &(nb->conns_waiting.lh));
512 BUG_ON((cn->target.out.rb.lh.next == &(nb->conns_waiting.lh)) &&
513 (nb->conns_waiting.lh.prev !=
514 &(cn->target.out.rb.lh)));
515 list_del(&(cn->target.out.rb.lh));
516 list_add_tail(&(cn->target.out.rb.lh),
517 &(nb->conns_waiting.lh_nextpass));
518 kref_get(&(cn->ref));
519 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
522 priority = cor_refresh_conn_priority(cn, 0);
524 spin_lock_bh(&(cn->rcv_lock));
526 if (unlikely(cn->targettype != TARGET_OUT)) {
527 spin_unlock_bh(&(cn->rcv_lock));
528 continue;
531 burstprio = _cor_resume_conns_burstprio(cn, priority);
533 maxsend = _cor_resume_conns_maxsend(q, cn, burstprio,
534 &maxsend_forcedelay);
535 if (cn->target.out.maxsend_extra >= maxsend)
536 maxsend_forcedelay = 0;
537 maxsend += cn->target.out.maxsend_extra;
538 if (unlikely(maxsend > U32_MAX))
539 maxsend = U32_MAX;
540 if (unlikely(maxsend >= 65536))
541 maxsend_forcedelay = 0;
543 rc2 = _cor_flush_out(cn, maxsend, &sent2, 1,
544 maxsend_forcedelay);
546 if (rc2 == RC_FLUSH_CONN_OUT_OK ||
547 rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE) {
548 cn->target.out.maxsend_extra = 0;
549 cor_qos_remove_conn(cn);
550 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
551 rc2 == RC_FLUSH_CONN_OUT_OOM)) {
552 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
553 if (likely(cn->target.out.rb.in_queue !=
554 RB_INQUEUE_FALSE)) {
555 list_del(&(cn->target.out.rb.lh));
556 list_add(&(cn->target.out.rb.lh),
557 &(nb->conns_waiting.lh));
559 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
560 iflags);
561 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
562 rc2 == RC_FLUSH_CONN_OUT_OOM) {
563 cn->target.out.maxsend_extra = 0;
564 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
565 if (unlikely(maxsend - sent2 > 65535))
566 cn->target.out.maxsend_extra = 65535;
567 else
568 cn->target.out.maxsend_extra = maxsend - sent2;
571 if (sent2 != 0)
572 _cor_resume_conns_accountbusytime(cn, priority,
573 burstprio, jiffies_nb_lastduration);
575 spin_unlock_bh(&(cn->rcv_lock));
577 if (sent2 != 0) {
578 *progress = 1;
579 cor_wake_sender(cn);
582 kref_put(&(cn->ref), cor_free_conn);
584 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
585 rc2 == RC_FLUSH_CONN_OUT_OOM) {
586 return QOS_RESUME_CONG;
591 static struct cor_neighbor *cor_resume_neighbors_peeknextnb(
592 struct cor_qos_queue *q, unsigned long *jiffies_nb_lastduration)
594 unsigned long iflags;
596 struct cor_neighbor *nb;
598 spin_lock_irqsave(&(q->qlock), iflags);
600 if (list_empty(&(q->neighbors_waiting))) {
601 if (list_empty(&(q->neighbors_waiting_nextpass))) {
602 BUG_ON(q->numconns != 0);
603 spin_unlock_irqrestore(&(q->qlock), iflags);
605 return 0;
606 } else {
607 unsigned long jiffies_tmp = jiffies;
608 cor_swap_list_items(&(q->neighbors_waiting),
609 &(q->neighbors_waiting_nextpass));
611 WARN_ONCE(time_before(jiffies_tmp,
612 q->jiffies_nb_pass_start),
613 "cor_resume_neighbors_peeknextnb: jiffies after jiffies_nb_pass_start (this is only a performance issue)");
615 q->jiffies_nb_lastduration = jiffies -
616 q->jiffies_nb_pass_start;
617 q->jiffies_nb_pass_start = jiffies_tmp;
621 *jiffies_nb_lastduration = q->jiffies_nb_lastduration;
624 BUG_ON(q->numconns == 0);
625 BUG_ON(list_empty(&(q->neighbors_waiting)));
627 nb = container_of(q->neighbors_waiting.next, struct cor_neighbor,
628 rb.lh);
630 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
631 BUG_ON(nb->rb.lh.prev != &(q->neighbors_waiting));
632 BUG_ON((nb->rb.lh.next == &(q->neighbors_waiting)) &&
633 (q->neighbors_waiting.prev != &(nb->rb.lh)));
635 kref_get(&(nb->ref));
637 spin_unlock_irqrestore(&(q->qlock), iflags);
639 return nb;
642 static int cor_resume_neighbors(struct cor_qos_queue *q, int *sent)
644 unsigned long iflags;
645 int rc;
647 unsigned long jiffies_nb_lastduration;
648 struct cor_neighbor *nb = cor_resume_neighbors_peeknextnb(q,
649 &jiffies_nb_lastduration);
651 if (nb == 0)
652 return QOS_RESUME_DONE;
654 atomic_set(&(nb->cmsg_delay_conndata), 1);
656 rc = _cor_resume_neighbors(q, nb, jiffies_nb_lastduration, sent);
657 if (rc == QOS_RESUME_CONG) {
658 kref_put(&(nb->ref), cor_neighbor_free);
659 return QOS_RESUME_CONG;
661 BUG_ON(rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR);
663 atomic_set(&(nb->cmsg_delay_conndata), 0);
664 spin_lock_bh(&(nb->cmsg_lock));
665 cor_schedule_controlmsg_timer(nb);
666 spin_unlock_bh(&(nb->cmsg_lock));
668 spin_lock_irqsave(&(q->qlock), iflags);
669 if (likely(nb->rb.in_queue == RB_INQUEUE_TRUE)) {
670 if (nb->conns_waiting.cnt == 0) {
671 nb->rb.in_queue = RB_INQUEUE_FALSE;
672 list_del(&(nb->rb.lh));
673 kref_put(&(nb->ref), cor_kreffree_bug);
674 } else {
675 list_del(&(nb->rb.lh));
676 list_add_tail(&(nb->rb.lh),
677 &(q->neighbors_waiting_nextpass));
680 spin_unlock_irqrestore(&(q->qlock), iflags);
682 kref_put(&(nb->ref), cor_neighbor_free);
684 return QOS_RESUME_NEXTNEIGHBOR;
687 static int __cor_qos_resume(struct cor_qos_queue *q, int caller, int *sent)
689 unsigned long iflags;
690 int rc = QOS_RESUME_DONE;
691 struct list_head *lh;
693 spin_lock_irqsave(&(q->qlock), iflags);
695 if (caller == QOS_CALLER_KPACKET)
696 lh = &(q->kpackets_waiting);
697 else if (caller == QOS_CALLER_CONN_RETRANS)
698 lh = &(q->conn_retrans_waiting);
699 else if (caller == QOS_CALLER_ANNOUNCE)
700 lh = &(q->announce_waiting);
701 else
702 BUG();
704 while (list_empty(lh) == 0) {
705 struct cor_resume_block *rb = container_of(lh->next,
706 struct cor_resume_block, lh);
708 unsigned long cmsg_send_start_j;
709 ktime_t cmsg_send_start_kt;
711 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
712 rb->in_queue = RB_INQUEUE_FALSE;
713 list_del(&(rb->lh));
715 if (caller == QOS_CALLER_KPACKET) {
716 struct cor_neighbor *nb = container_of(rb,
717 struct cor_neighbor, rb_kp);
718 cmsg_send_start_j = nb->cmsg_send_start_j;
719 cmsg_send_start_kt = nb->cmsg_send_start_kt;
722 spin_unlock_irqrestore(&(q->qlock), iflags);
723 if (caller == QOS_CALLER_KPACKET) {
724 rc = cor_send_messages(container_of(rb,
725 struct cor_neighbor, rb_kp),
726 cmsg_send_start_j, cmsg_send_start_kt,
727 sent);
728 } else if (caller == QOS_CALLER_CONN_RETRANS) {
729 rc = cor_send_retrans(container_of(rb,
730 struct cor_neighbor, rb_cr), sent);
731 } else if (caller == QOS_CALLER_ANNOUNCE) {
732 rc = _cor_send_announce(container_of(rb,
733 struct cor_announce_data, rb), 1, sent);
734 } else {
735 BUG();
737 spin_lock_irqsave(&(q->qlock), iflags);
739 if (rc != QOS_RESUME_DONE && caller == QOS_CALLER_KPACKET) {
740 struct cor_neighbor *nb = container_of(rb,
741 struct cor_neighbor, rb_kp);
743 nb->cmsg_send_start_j = cmsg_send_start_j;
744 nb->cmsg_send_start_kt = cmsg_send_start_kt;
747 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
748 rb->in_queue = RB_INQUEUE_TRUE;
749 list_add(&(rb->lh), lh);
750 break;
753 if (caller == QOS_CALLER_KPACKET) {
754 kref_put(&(container_of(rb, struct cor_neighbor,
755 rb_kp)->ref), cor_neighbor_free);
756 } else if (caller == QOS_CALLER_CONN_RETRANS) {
757 kref_put(&(container_of(rb, struct cor_neighbor,
758 rb_cr)->ref), cor_neighbor_free);
759 } else if (caller == QOS_CALLER_ANNOUNCE) {
760 kref_put(&(container_of(rb,
761 struct cor_announce_data, rb)->ref),
762 cor_announce_data_free);
763 } else {
764 BUG();
767 kref_put(&(q->ref), cor_kreffree_bug);
770 spin_unlock_irqrestore(&(q->qlock), iflags);
772 return rc;
775 static int _cor_qos_resume(struct cor_qos_queue *q, int *sent)
777 unsigned long iflags;
778 int i = QOS_CALLER_KPACKET;
779 int rc;
781 spin_lock_irqsave(&(q->qlock), iflags);
783 while (1) {
784 if (q->dev == 0) {
785 rc = QOS_RESUME_EXIT;
786 break;
789 if (i == QOS_CALLER_KPACKET &&
790 list_empty(&(q->kpackets_waiting))) {
791 i = QOS_CALLER_CONN_RETRANS;
792 continue;
793 } else if (i == QOS_CALLER_CONN_RETRANS &&
794 list_empty(&(q->conn_retrans_waiting))) {
795 i = QOS_CALLER_ANNOUNCE;
796 continue;
797 } else if (i == QOS_CALLER_ANNOUNCE &&
798 list_empty(&(q->announce_waiting))) {
799 i = QOS_CALLER_NEIGHBOR;
800 continue;
801 } else if (i == QOS_CALLER_NEIGHBOR &&
802 list_empty(&(q->neighbors_waiting)) &&
803 list_empty(&(q->neighbors_waiting_nextpass))) {
804 rc = QOS_RESUME_DONE;
805 break;
808 spin_unlock_irqrestore(&(q->qlock), iflags);
810 if (i == QOS_CALLER_NEIGHBOR) {
811 rc = cor_resume_neighbors(q, sent);
812 } else {
813 rc = __cor_qos_resume(q, i, sent);
816 spin_lock_irqsave(&(q->qlock), iflags);
818 if (rc == QOS_RESUME_CONG)
819 break;
821 i = QOS_CALLER_KPACKET;
824 if (rc == QOS_RESUME_DONE) {
825 BUG_ON(!list_empty(&(q->kpackets_waiting)));
826 BUG_ON(!list_empty(&(q->conn_retrans_waiting)));
827 BUG_ON(!list_empty(&(q->announce_waiting)));
828 BUG_ON(!list_empty(&(q->neighbors_waiting)));
829 BUG_ON(!list_empty(&(q->neighbors_waiting_nextpass)));
831 atomic_set(&(q->qos_resume_scheduled), 0);
834 cor_qos_queue_set_congstatus(q);
836 if (q->dev == 0)
837 rc = QOS_RESUME_EXIT;
839 spin_unlock_irqrestore(&(q->qlock), iflags);
841 return rc;
844 int cor_qos_resume_threadfunc(void *data)
846 struct cor_qos_queue *q = (struct cor_qos_queue *) data;
848 while (1) {
849 int sent = 0;
850 int rc;
852 rc = _cor_qos_resume(q, &sent);
854 if (rc == QOS_RESUME_DONE) {
855 wait_event(q->qos_resume_wq,
856 atomic_read(&(q->qos_resume_scheduled))
857 != 0);
858 } else if (rc == QOS_RESUME_CONG) {
859 unsigned long jiffies_tmp = jiffies;
860 unsigned long delay_ms = 0;
862 if (sent)
863 q->jiffies_lastprogress = jiffies_tmp;
864 delay_ms = (jiffies_to_msecs(jiffies_tmp -
865 q->jiffies_lastprogress) + 8) / 4;
866 if (delay_ms < 2) {
867 delay_ms = 2;
868 } else if (delay_ms > 20) {
869 delay_ms = 20;
872 msleep(delay_ms);
873 } else if (rc == QOS_RESUME_EXIT) {
874 return 0;
875 } else {
876 BUG();
881 static inline int cor_qos_queue_is_destroyed(struct cor_qos_queue *q_locked)
883 return q_locked->dev == 0;
886 struct cor_qos_queue *cor_get_queue(struct net_device *dev)
888 struct cor_qos_queue *ret = 0;
889 struct list_head *curr;
891 spin_lock_bh(&cor_queues_lock);
892 curr = cor_queues.next;
893 while (curr != (&cor_queues)) {
894 struct cor_qos_queue *q = container_of(curr,
895 struct cor_qos_queue, queue_list);
896 if (q->dev == dev) {
897 ret = q;
898 kref_get(&(ret->ref));
899 break;
901 curr = curr->next;
903 spin_unlock_bh(&cor_queues_lock);
904 return ret;
907 static void cor_qos_waitexit(struct work_struct *work)
909 spin_lock_bh(&cor_queues_lock);
910 while (!list_empty(&cor_queues_waitexit)) {
911 struct cor_qos_queue *q = container_of(cor_queues_waitexit.next,
912 struct cor_qos_queue, queue_list);
913 list_del(&(q->queue_list));
915 spin_unlock_bh(&cor_queues_lock);
917 kthread_stop(q->qos_resume_thread);
918 put_task_struct(q->qos_resume_thread);
919 kref_put(&(q->ref), cor_free_qos);
921 spin_lock_bh(&cor_queues_lock);
923 spin_unlock_bh(&cor_queues_lock);
926 static void _cor_destroy_queue_kpackets(struct cor_qos_queue *q)
928 while (list_empty(&(q->kpackets_waiting)) == 0) {
929 struct list_head *curr = q->kpackets_waiting.next;
930 struct cor_resume_block *rb = container_of(curr,
931 struct cor_resume_block, lh);
932 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
933 rb->in_queue = RB_INQUEUE_FALSE;
934 list_del(curr);
936 kref_put(&(container_of(rb, struct cor_neighbor, rb_kp)->ref),
937 cor_neighbor_free);
938 kref_put(&(q->ref), cor_kreffree_bug);
942 static void _cor_destroy_queue_conn_retrans(struct cor_qos_queue *q)
944 while (list_empty(&(q->conn_retrans_waiting)) == 0) {
945 struct list_head *curr = q->conn_retrans_waiting.next;
946 struct cor_resume_block *rb = container_of(curr,
947 struct cor_resume_block, lh);
948 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
949 rb->in_queue = RB_INQUEUE_FALSE;
950 list_del(curr);
952 kref_put(&(container_of(rb, struct cor_neighbor, rb_cr)->ref),
953 cor_neighbor_free);
954 kref_put(&(q->ref), cor_kreffree_bug);
958 static void _cor_destroy_queue_announce(struct cor_qos_queue *q)
960 while (list_empty(&(q->announce_waiting)) == 0) {
961 struct list_head *curr = q->announce_waiting.next;
962 struct cor_resume_block *rb = container_of(curr,
963 struct cor_resume_block, lh);
964 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
965 rb->in_queue = RB_INQUEUE_FALSE;
966 list_del(curr);
968 kref_put(&(container_of(rb, struct cor_announce_data, rb)->ref),
969 cor_announce_data_free);
970 kref_put(&(q->ref), cor_kreffree_bug);
974 static void _cor_destroy_queue_neighbor(struct cor_qos_queue *q,
975 struct list_head *lh)
977 while (list_empty(lh) == 0) {
978 struct list_head *curr = lh->next;
979 struct cor_resume_block *rb = container_of(curr,
980 struct cor_resume_block, lh);
981 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
982 rb->in_queue = RB_INQUEUE_FALSE;
983 list_del(curr);
985 kref_put(&(container_of(rb, struct cor_neighbor, rb)->ref),
986 cor_neighbor_free);
987 kref_put(&(q->ref), cor_kreffree_bug);
991 static struct cor_qos_queue *cor_unlink_queue(struct net_device *dev)
993 struct cor_qos_queue *ret = 0;
994 struct list_head *curr;
996 spin_lock_bh(&cor_queues_lock);
997 curr = cor_queues.next;
998 while (curr != (&cor_queues)) {
999 struct cor_qos_queue *q = container_of(curr,
1000 struct cor_qos_queue, queue_list);
1001 if (dev == 0 || q->dev == dev) {
1002 ret = q;
1003 kref_get(&(ret->ref));
1005 list_del(&(q->queue_list));
1006 kref_put(&(q->ref), cor_kreffree_bug);
1007 break;
1009 curr = curr->next;
1011 spin_unlock_bh(&cor_queues_lock);
1012 return ret;
1015 int cor_destroy_queue(struct net_device *dev)
1017 int rc = 1;
1018 unsigned long iflags;
1020 while (1) {
1021 struct cor_qos_queue *q = cor_unlink_queue(dev);
1023 if (q == 0)
1024 break;
1026 rc = 0;
1028 spin_lock_irqsave(&(q->qlock), iflags);
1029 if (q->dev != 0) {
1030 dev_put(q->dev);
1031 q->dev = 0;
1033 _cor_destroy_queue_kpackets(q);
1034 _cor_destroy_queue_conn_retrans(q);
1035 _cor_destroy_queue_announce(q);
1036 _cor_destroy_queue_neighbor(q, &(q->neighbors_waiting));
1037 _cor_destroy_queue_neighbor(q, &(q->neighbors_waiting_nextpass));
1039 spin_unlock_irqrestore(&(q->qlock), iflags);
1041 cor_schedule_qos_resume(q);
1043 spin_lock_bh(&cor_queues_lock);
1044 list_add(&(q->queue_list), &cor_queues_waitexit);
1045 spin_unlock_bh(&cor_queues_lock);
1047 schedule_work(&cor_qos_waitexit_work);
1050 return rc;
1053 int cor_create_queue(struct net_device *dev)
1055 struct cor_qos_queue *q = kmalloc(sizeof(struct cor_qos_queue),
1056 GFP_KERNEL);
1058 if (q == 0) {
1059 printk(KERN_ERR "cor: unable to allocate memory for device "
1060 "queue, not enabling device");
1061 return 1;
1064 memset(q, 0, sizeof(struct cor_qos_queue));
1066 spin_lock_init(&(q->qlock));
1068 kref_init(&(q->ref));
1070 q->dev = dev;
1071 dev_hold(dev);
1073 atomic_set(&(q->qos_resume_scheduled), 0);
1075 init_waitqueue_head(&(q->qos_resume_wq));
1077 INIT_LIST_HEAD(&(q->kpackets_waiting));
1078 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
1079 INIT_LIST_HEAD(&(q->announce_waiting));
1080 INIT_LIST_HEAD(&(q->neighbors_waiting));
1081 INIT_LIST_HEAD(&(q->neighbors_waiting_nextpass));
1083 atomic_set(&(q->cong_status), 0);
1085 q->qos_resume_thread = kthread_create(cor_qos_resume_threadfunc,
1086 q, "cor_qos_resume");
1087 if (q->qos_resume_thread == 0) {
1088 printk(KERN_ERR "cor: unable to start qos_resume thread");
1090 if (q->dev != 0) {
1091 dev_put(q->dev);
1092 q->dev = 0;
1095 kref_put(&(q->ref), cor_free_qos);
1097 return 1;
1099 get_task_struct(q->qos_resume_thread);
1100 wake_up_process(q->qos_resume_thread);
1102 spin_lock_bh(&cor_queues_lock);
1103 list_add(&(q->queue_list), &cor_queues);
1104 spin_unlock_bh(&cor_queues_lock);
1106 return 0;
1109 static void cor_qos_queue_set_congstatus(struct cor_qos_queue *q_locked)
1111 __u32 newstatus;
1113 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ/50)) {
1114 newstatus = CONGSTATUS_NONE;
1115 } else if (list_empty(&(q_locked->kpackets_waiting)) == 0) {
1116 newstatus = CONGSTATUS_KPACKETS;
1117 } else if (list_empty(&(q_locked->conn_retrans_waiting)) == 0) {
1118 newstatus = CONGSTATUS_RETRANS;
1119 } else if (list_empty(&(q_locked->announce_waiting)) == 0) {
1120 newstatus = CONGSTATUS_ANNOUNCE;
1121 } else if (list_empty(&(q_locked->neighbors_waiting)) == 0 ||
1122 list_empty(&(q_locked->neighbors_waiting_nextpass)) ==
1123 0) {
1124 newstatus = CONGSTATUS_CONNDATA;
1125 } else {
1126 newstatus = CONGSTATUS_NONE;
1129 atomic_set(&(q_locked->cong_status), newstatus);
1132 void cor_qos_set_lastdrop(struct cor_qos_queue *q)
1134 unsigned long iflags;
1136 spin_lock_irqsave(&(q->qlock), iflags);
1137 q->jiffies_lastdrop = jiffies;
1138 cor_qos_queue_set_congstatus(q);
1139 spin_unlock_irqrestore(&(q->qlock), iflags);
1143 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
1144 * caller
1146 static void _cor_qos_enqueue(struct cor_qos_queue *q,
1147 struct cor_resume_block *rb, unsigned long cmsg_send_start_j,
1148 ktime_t cmsg_send_start_kt, int caller,
1149 int from_nbcongwin_resume)
1151 int queues_empty;
1153 if (rb->in_queue == RB_INQUEUE_TRUE) {
1154 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
1156 if (caller == QOS_CALLER_KPACKET) {
1157 struct cor_neighbor *nb = container_of(rb,
1158 struct cor_neighbor, rb_kp);
1159 if (time_before(cmsg_send_start_j,
1160 nb->cmsg_send_start_j))
1161 nb->cmsg_send_start_j = cmsg_send_start_j;
1162 if (ktime_before(cmsg_send_start_kt,
1163 nb->cmsg_send_start_kt))
1164 nb->cmsg_send_start_kt = cmsg_send_start_kt;
1166 return;
1167 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
1168 from_nbcongwin_resume == 0) {
1169 return;
1172 if (unlikely(cor_qos_queue_is_destroyed(q)))
1173 return;
1175 queues_empty = list_empty(&(q->kpackets_waiting)) &&
1176 list_empty(&(q->conn_retrans_waiting)) &&
1177 list_empty(&(q->announce_waiting)) &&
1178 list_empty(&(q->neighbors_waiting)) &&
1179 list_empty(&(q->neighbors_waiting_nextpass));
1181 BUG_ON(!queues_empty && atomic_read(&(q->qos_resume_scheduled)) == 0);
1183 rb->in_queue = RB_INQUEUE_TRUE;
1185 if (caller == QOS_CALLER_KPACKET) {
1186 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
1187 rb_kp);
1188 nb->cmsg_send_start_j = cmsg_send_start_j;
1189 nb->cmsg_send_start_kt = cmsg_send_start_kt;
1190 list_add_tail(&(rb->lh), &(q->kpackets_waiting));
1191 kref_get(&(nb->ref));
1192 } else if (caller == QOS_CALLER_CONN_RETRANS) {
1193 list_add_tail(&(rb->lh) , &(q->conn_retrans_waiting));
1194 kref_get(&(container_of(rb, struct cor_neighbor, rb_cr)->ref));
1195 } else if (caller == QOS_CALLER_ANNOUNCE) {
1196 list_add_tail(&(rb->lh), &(q->announce_waiting));
1197 kref_get(&(container_of(rb, struct cor_announce_data, rb)->ref));
1198 } else if (caller == QOS_CALLER_NEIGHBOR) {
1199 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
1200 rb);
1201 list_add_tail(&(rb->lh), &(q->neighbors_waiting_nextpass));
1202 kref_get(&(nb->ref));
1203 BUG_ON(nb->conns_waiting.cnt == 0);
1204 q->numconns += nb->conns_waiting.cnt;
1205 q->priority_sum += nb->conns_waiting.priority_sum;
1206 q->jiffies_nb_lastduration = 0;
1207 q->jiffies_nb_pass_start = jiffies;
1208 } else {
1209 BUG();
1211 kref_get(&(q->ref));
1213 cor_schedule_qos_resume(q);
1215 cor_qos_queue_set_congstatus(q);
1218 void cor_qos_enqueue(struct cor_qos_queue *q, struct cor_resume_block *rb,
1219 unsigned long cmsg_send_start_j, ktime_t cmsg_send_start_kt,
1220 int caller)
1222 unsigned long iflags;
1224 spin_lock_irqsave(&(q->qlock), iflags);
1225 _cor_qos_enqueue(q, rb, cmsg_send_start_j, cmsg_send_start_kt,
1226 caller, 0);
1227 spin_unlock_irqrestore(&(q->qlock), iflags);
1230 void cor_qos_remove_conn(struct cor_conn *trgt_out_lx)
1232 unsigned long iflags;
1233 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
1234 struct cor_qos_queue *q = nb->queue;
1235 int sched_cmsg = 0;
1236 int krefput_nb = 0;
1238 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1239 BUG_ON(q == 0);
1241 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1242 if (trgt_out_lx->target.out.rb.in_queue == RB_INQUEUE_FALSE) {
1243 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1244 return;
1246 spin_lock(&(q->qlock));
1248 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_FALSE;
1249 list_del(&(trgt_out_lx->target.out.rb.lh));
1250 BUG_ON(nb->conns_waiting.cnt == 0);
1251 nb->conns_waiting.cnt--;
1252 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1253 BUG_ON(q->numconns == 0);
1254 q->numconns--;
1257 BUG_ON(nb->conns_waiting.priority_sum <
1258 trgt_out_lx->target.out.rb_priority);
1259 BUG_ON(q->priority_sum < trgt_out_lx->target.out.rb_priority);
1260 nb->conns_waiting.priority_sum -=
1261 trgt_out_lx->target.out.rb_priority;
1262 q->priority_sum -= trgt_out_lx->target.out.rb_priority;
1263 trgt_out_lx->target.out.rb_priority = 0;
1265 if (list_empty(&(nb->conns_waiting.lh)) &&
1266 list_empty(&(nb->conns_waiting.lh_nextpass))) {
1267 BUG_ON(nb->conns_waiting.priority_sum != 0);
1268 BUG_ON(nb->conns_waiting.cnt != 0);
1269 } else {
1270 BUG_ON(nb->conns_waiting.cnt == 0);
1273 if (list_empty(&(nb->conns_waiting.lh)) &&
1274 list_empty(&(nb->conns_waiting.lh_nextpass)) &&
1275 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1276 nb->rb.in_queue = RB_INQUEUE_FALSE;
1277 list_del(&(nb->rb.lh));
1278 if (atomic_read(&(nb->cmsg_delay_conndata)) != 0) {
1279 atomic_set(&(nb->cmsg_delay_conndata), 0);
1280 sched_cmsg = 1;
1283 krefput_nb = 1;
1285 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1286 list_empty(&(q->neighbors_waiting_nextpass)) &&
1287 q->numconns != 0);
1288 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1289 list_empty(&(q->neighbors_waiting_nextpass)) &&
1290 q->priority_sum != 0);
1292 cor_qos_queue_set_congstatus(q);
1295 spin_unlock(&(q->qlock));
1296 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1298 if (sched_cmsg) {
1299 spin_lock_bh(&(nb->cmsg_lock));
1300 cor_schedule_controlmsg_timer(nb);
1301 spin_unlock_bh(&(nb->cmsg_lock));
1304 kref_put(&(trgt_out_lx->ref), cor_kreffree_bug);
1306 if (krefput_nb)
1307 kref_put(&(nb->ref), cor_neighbor_free);
1310 void cor_qos_enqueue_conn(struct cor_conn *trgt_out_lx)
1312 unsigned long iflags;
1313 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
1314 struct cor_qos_queue *q;
1316 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1318 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1320 if (trgt_out_lx->target.out.rb.in_queue != RB_INQUEUE_FALSE)
1321 goto out;
1323 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_TRUE;
1324 list_add_tail(&(trgt_out_lx->target.out.rb.lh),
1325 &(nb->conns_waiting.lh));
1326 kref_get(&(trgt_out_lx->ref));
1327 nb->conns_waiting.cnt++;
1329 q = trgt_out_lx->target.out.nb->queue;
1330 spin_lock(&(q->qlock));
1331 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1332 q->numconns++;
1333 } else {
1334 _cor_qos_enqueue(q, &(nb->rb), 0, ns_to_ktime(0),
1335 QOS_CALLER_NEIGHBOR, 0);
1337 spin_unlock(&(q->qlock));
1339 out:
1340 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1343 struct sk_buff *cor_create_packet(struct cor_neighbor *nb, int size,
1344 gfp_t alloc_flags)
1346 struct sk_buff *ret;
1348 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1349 nb->dev->needed_tailroom, alloc_flags);
1350 if (unlikely(ret == 0))
1351 return 0;
1353 ret->protocol = htons(ETH_P_COR);
1354 ret->dev = nb->dev;
1356 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1357 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1358 nb->dev->dev_addr, ret->len) < 0))
1359 return 0;
1360 skb_reset_network_header(ret);
1362 return ret;
1365 struct sk_buff *cor_create_packet_conndata(struct cor_neighbor *nb, int size,
1366 gfp_t alloc_flags, __u32 conn_id, __u64 seqno,
1367 __u8 snd_delayed_lowbuf, __u8 flush)
1369 struct sk_buff *ret;
1370 char *dest;
1372 ret = cor_create_packet(nb, size + 11, alloc_flags);
1373 if (unlikely(ret == 0))
1374 return 0;
1376 dest = skb_put(ret, 11);
1377 BUG_ON(dest == 0);
1379 if (flush != 0) {
1380 if (snd_delayed_lowbuf != 0) {
1381 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH;
1382 } else {
1383 dest[0] = PACKET_TYPE_CONNDATA_FLUSH;
1385 } else {
1386 if (snd_delayed_lowbuf != 0) {
1387 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED;
1388 } else {
1389 dest[0] = PACKET_TYPE_CONNDATA;
1392 dest += 1;
1394 cor_put_u32(dest, conn_id);
1395 dest += 4;
1396 cor_put_u48(dest, seqno);
1397 dest += 6;
1399 return ret;
1403 static void cor_rcv_conndata(struct sk_buff *skb, int rcv_delayed_lowbuf,
1404 __u8 flush)
1406 struct cor_neighbor *nb = cor_get_neigh_by_mac(skb);
1408 __u32 conn_id;
1409 __u64 seqno;
1411 char *connid_p;
1412 char *seqno_p;
1414 /* __u8 rand; */
1416 if (unlikely(nb == 0))
1417 goto drop;
1419 connid_p = cor_pull_skb(skb, 4);
1420 if (unlikely(connid_p == 0))
1421 goto drop;
1423 seqno_p = cor_pull_skb(skb, 6);
1424 if (unlikely(seqno_p == 0))
1425 goto drop;
1427 conn_id = cor_parse_u32(connid_p);
1428 seqno = cor_parse_u48(seqno_p);
1430 /* get_random_bytes(&rand, 1);
1431 if (rand < 64)
1432 goto drop; */
1434 if (unlikely(skb->len <= 0))
1435 goto drop;
1437 cor_conn_rcv(nb, skb, 0, 0, conn_id, seqno, rcv_delayed_lowbuf, flush);
1439 if (0) {
1440 drop:
1441 kfree_skb(skb);
1444 if (nb != 0) {
1445 kref_put(&(nb->ref), cor_neighbor_free);
1449 static void cor_rcv_cmsg(struct sk_buff *skb, int ackneeded)
1451 struct cor_neighbor *nb = cor_get_neigh_by_mac(skb);
1453 if (unlikely(nb == 0)) {
1454 kfree_skb(skb);
1455 } else {
1456 cor_kernel_packet(nb, skb, ackneeded);
1457 kref_put(&(nb->ref), cor_neighbor_free);
1461 static int cor_rcv(struct sk_buff *skb, struct net_device *dev,
1462 struct packet_type *pt, struct net_device *orig_dev)
1464 __u8 packet_type;
1465 char *packet_type_p;
1467 if (skb->pkt_type == PACKET_OTHERHOST ||
1468 unlikely(skb->pkt_type == PACKET_LOOPBACK))
1469 goto drop;
1471 packet_type_p = cor_pull_skb(skb, 1);
1473 if (unlikely(packet_type_p == 0))
1474 goto drop;
1476 packet_type = *packet_type_p;
1478 if (unlikely(packet_type == PACKET_TYPE_ANNOUNCE)) {
1479 cor_rcv_announce(skb);
1480 return NET_RX_SUCCESS;
1481 } else if (packet_type == PACKET_TYPE_CMSG_NOACK) {
1482 cor_rcv_cmsg(skb, ACK_NEEDED_NO);
1483 return NET_RX_SUCCESS;
1484 } else if (packet_type == PACKET_TYPE_CMSG_ACKSLOW) {
1485 cor_rcv_cmsg(skb, ACK_NEEDED_SLOW);
1486 return NET_RX_SUCCESS;
1487 } else if (packet_type == PACKET_TYPE_CMSG_ACKFAST) {
1488 cor_rcv_cmsg(skb, ACK_NEEDED_FAST);
1489 return NET_RX_SUCCESS;
1490 } else if (packet_type == PACKET_TYPE_CONNDATA) {
1491 cor_rcv_conndata(skb, 0, 0);
1492 return NET_RX_SUCCESS;
1493 } else if (packet_type == PACKET_TYPE_CONNDATA_LOWBUFDELAYED) {
1494 cor_rcv_conndata(skb, 1, 0);
1495 return NET_RX_SUCCESS;
1496 } else if (packet_type == PACKET_TYPE_CONNDATA_FLUSH) {
1497 cor_rcv_conndata(skb, 0, 1);
1498 return NET_RX_SUCCESS;
1499 } else if (packet_type == PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH) {
1500 cor_rcv_conndata(skb, 1, 1);
1501 return NET_RX_SUCCESS;
1502 } else {
1503 kfree_skb(skb);
1504 return NET_RX_SUCCESS;
1507 drop:
1508 kfree_skb(skb);
1509 return NET_RX_DROP;
1512 int cor_netdev_notify_func(struct notifier_block *not, unsigned long event,
1513 void *ptr)
1515 struct net_device *dev = netdev_notifier_info_to_dev(ptr);
1516 int rc;
1518 switch(event) {
1519 case NETDEV_UP:
1520 if (dev->flags & IFF_LOOPBACK)
1521 break;
1523 BUG_ON(dev == 0);
1524 rc = cor_create_queue(dev);
1525 if (rc == 1)
1526 return 1;
1527 if (cor_is_clientmode() == 0)
1528 cor_announce_send_start(dev, dev->broadcast,
1529 ANNOUNCE_TYPE_BROADCAST);
1530 break;
1531 case NETDEV_DOWN:
1532 printk(KERN_ERR "down 1");
1533 udelay(100);
1534 BUG_ON(dev == 0);
1535 printk(KERN_ERR "down 2");
1536 udelay(100);
1537 cor_announce_send_stop(dev, 0, ANNOUNCE_TYPE_BROADCAST);
1538 printk(KERN_ERR "down 3");
1539 udelay(100);
1540 cor_reset_neighbors(dev);
1541 printk(KERN_ERR "down 4");
1542 udelay(100);
1543 cor_destroy_queue(dev);
1544 printk(KERN_ERR "down 5");
1545 udelay(100);
1546 break;
1547 case NETDEV_REBOOT:
1548 case NETDEV_CHANGE:
1549 case NETDEV_REGISTER:
1550 case NETDEV_UNREGISTER:
1551 case NETDEV_CHANGEMTU:
1552 case NETDEV_CHANGEADDR:
1553 case NETDEV_GOING_DOWN:
1554 case NETDEV_CHANGENAME:
1555 case NETDEV_FEAT_CHANGE:
1556 case NETDEV_BONDING_FAILOVER:
1557 break;
1558 default:
1559 return 1;
1562 return 0;
1565 static struct packet_type cor_ptype = {
1566 .type = htons(ETH_P_COR),
1567 .dev = 0,
1568 .func = cor_rcv
1571 void cor_dev_down(void)
1573 if (cor_pack_registered != 0) {
1574 cor_pack_registered = 0;
1575 dev_remove_pack(&cor_ptype);
1578 if (cor_netdev_notify_registered != 0) {
1579 if (unregister_netdevice_notifier(&cor_netdev_notify) != 0) {
1580 printk(KERN_WARNING "warning: cor_dev_down: "
1581 "unregister_netdevice_notifier failed");
1582 BUG();
1584 cor_netdev_notify_registered = 0;
1588 int cor_dev_up(void)
1590 BUG_ON(cor_netdev_notify_registered != 0);
1591 if (register_netdevice_notifier(&cor_netdev_notify) != 0)
1592 return 1;
1593 cor_netdev_notify_registered = 1;
1595 BUG_ON(cor_pack_registered != 0);
1596 dev_add_pack(&cor_ptype);
1597 cor_pack_registered = 1;
1599 return 0;
1602 int __init cor_dev_init(void)
1604 memset(&cor_netdev_notify, 0, sizeof(cor_netdev_notify));
1605 cor_netdev_notify.notifier_call = cor_netdev_notify_func;
1607 return 0;
1610 void __exit cor_dev_exit1(void)
1612 flush_work(&cor_qos_waitexit_work);
1615 MODULE_LICENSE("GPL");