neigh_snd: race condition fix, shorter connect_neigh cmd, 32 bit ports
[cor.git] / net / cor / dev.c
blobb553b9ae7f6f55c6e121f7baec5c60041d94f49f
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/version.h>
22 #include <linux/kernel.h>
23 #include <linux/init.h>
24 #include <linux/in.h>
25 #include <linux/kthread.h>
28 #include "cor.h"
30 static struct notifier_block cor_netdev_notify;
31 __u8 cor_netdev_notify_registered = 0;
33 __u8 cor_pack_registered = 0;
35 static DEFINE_SPINLOCK(cor_queues_lock);
36 static LIST_HEAD(cor_queues);
37 static LIST_HEAD(cor_queues_waitexit);
39 static void cor_qos_waitexit(struct work_struct *work);
40 DECLARE_WORK(cor_qos_waitexit_work, cor_qos_waitexit);
43 static void _cor_qos_enqueue(struct cor_qos_queue *q,
44 struct cor_resume_block *rb, ktime_t cmsg_send_start,
45 int caller, int from_nbcongwin_resume);
48 #ifdef DEBUG_QOS_SLOWSEND
49 static DEFINE_SPINLOCK(slowsend_lock);
50 static unsigned long cor_last_send;
53 int _cor_dev_queue_xmit(struct sk_buff *skb, int caller)
55 int allowsend = 0;
56 unsigned long jiffies_tmp;
57 spin_lock_bh(&slowsend_lock);
58 jiffies_tmp = jiffies;
59 if (cor_last_send != jiffies_tmp) {
60 if (cor_last_send + 1 == jiffies_tmp) {
61 cor_last_send = jiffies_tmp;
62 } else {
63 cor_last_send = jiffies_tmp - 1;
65 allowsend = 1;
67 spin_unlock_bh(&slowsend_lock);
69 /* printk(KERN_ERR "cor_dev_queue_xmit %d, %d", caller, allowsend); */
70 if (allowsend) {
71 return dev_queue_xmit(skb);
72 } else {
73 kfree_skb(skb);
74 return NET_XMIT_DROP;
77 #endif
79 void cor_free_qos(struct kref *ref)
81 struct cor_qos_queue *q = container_of(ref, struct cor_qos_queue, ref);
82 kfree(q);
86 static void cor_qos_queue_set_congstatus(struct cor_qos_queue *q_locked);
88 /**
89 * neighbor congestion window:
90 * increment by 4096 every round trip if more that 2/3 of cwin is used
92 * in case of packet loss decrease by 1/4:
93 * - <= 1/8 immediately and
94 * - <= 1/4 during the next round trip
96 * in case of multiple packet loss events, do not decrement more than once per
97 * round trip
100 #ifdef COR_NBCONGWIN
102 /*extern __u64 get_bufspace_used(void);
104 static void print_conn_bufstats(struct cor_neighbor *nb)
106 / * not threadsafe, but this is only for debugging... * /
107 __u64 totalsize = 0;
108 __u64 read_remaining = 0;
109 __u32 numconns = 0;
110 struct list_head *lh;
111 unsigned long iflags;
113 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
115 lh = nb->conns_waiting.lh.next;
116 while (lh != &(nb->conns_waiting.lh)) {
117 struct cor_conn *cn = container_of(lh, struct cor_conn,
118 target.out.rb.lh);
119 totalsize += cn->data_buf.datasize;
120 read_remaining += cn->data_buf.read_remaining;
121 lh = lh->next;
124 lh = nb->conns_waiting.lh_nextpass.next;
125 while (lh != &(nb->conns_waiting.lh_nextpass)) {
126 struct cor_conn *cn = container_of(lh, struct cor_conn,
127 target.out.rb.lh);
128 totalsize += cn->data_buf.datasize;
129 read_remaining += cn->data_buf.read_remaining;
130 lh = lh->next;
133 numconns = nb->conns_waiting.cnt;
135 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
137 printk(KERN_ERR "conn %llu %llu %u", totalsize, read_remaining,
138 numconns);
139 } */
141 void cor_nbcongwin_data_retransmitted(struct cor_neighbor *nb,
142 __u64 bytes_sent)
144 __u64 min_cwin = cor_mss_conndata(nb, 0)*2 << NBCONGWIN_SHIFT;
145 __u64 cwin;
147 unsigned long iflags;
149 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
151 cwin = atomic64_read(&(nb->nbcongwin.cwin));
153 /* printk(KERN_ERR "retrans %llu %llu", cwin >> NBCONGWIN_SHIFT,
154 get_bufspace_used());
155 print_conn_bufstats(nb); */
157 BUG_ON(nb->nbcongwin.cwin_shrinkto > cwin);
159 if (nb->nbcongwin.cwin_shrinkto == cwin) {
160 cwin = max(min_cwin, cwin - cwin/16);
161 atomic64_set(&(nb->nbcongwin.cwin), cwin);
164 nb->nbcongwin.cwin_shrinkto = max(min_cwin, cwin - cwin/16);
166 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
169 static __u64 cor_nbcongwin_update_cwin(struct cor_neighbor *nb_cwlocked,
170 __u64 data_intransit, __u64 bytes_acked)
172 __u64 CWIN_MUL = (1 << NBCONGWIN_SHIFT);
173 __u32 INCR_PER_RTT = 8192;
175 __u64 cwin = atomic64_read(&(nb_cwlocked->nbcongwin.cwin));
177 __u64 cwin_tmp;
178 __u64 incrby;
180 if (nb_cwlocked->nbcongwin.cwin_shrinkto < cwin) {
181 __u64 shrinkby = (bytes_acked << (NBCONGWIN_SHIFT-2));
182 if (unlikely(shrinkby > cwin))
183 cwin = 0;
184 else
185 cwin -= shrinkby;
187 if (cwin < nb_cwlocked->nbcongwin.cwin_shrinkto)
188 cwin = nb_cwlocked->nbcongwin.cwin_shrinkto;
192 if (cwin * 2 > data_intransit * CWIN_MUL * 3)
193 goto out;
195 cwin_tmp = max(cwin, bytes_acked << NBCONGWIN_SHIFT);
197 if (unlikely(bytes_acked >= U64_MAX/INCR_PER_RTT/CWIN_MUL))
198 incrby = div64_u64(bytes_acked * INCR_PER_RTT,
199 cwin_tmp / CWIN_MUL / CWIN_MUL);
200 else if (unlikely(bytes_acked >=
201 U64_MAX/INCR_PER_RTT/CWIN_MUL/CWIN_MUL))
202 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL,
203 cwin_tmp / CWIN_MUL);
204 else
205 incrby = div64_u64(bytes_acked * INCR_PER_RTT * CWIN_MUL *
206 CWIN_MUL, cwin_tmp);
208 BUG_ON(incrby > INCR_PER_RTT * CWIN_MUL);
210 if (unlikely(cwin + incrby < cwin))
211 cwin = U64_MAX;
212 else
213 cwin += incrby;
215 if (unlikely(nb_cwlocked->nbcongwin.cwin_shrinkto + incrby <
216 nb_cwlocked->nbcongwin.cwin_shrinkto))
217 nb_cwlocked->nbcongwin.cwin_shrinkto = U64_MAX;
218 else
219 nb_cwlocked->nbcongwin.cwin_shrinkto += incrby;
221 out:
222 atomic64_set(&(nb_cwlocked->nbcongwin.cwin), cwin);
224 return cwin;
227 void cor_nbcongwin_data_acked(struct cor_neighbor *nb, __u64 bytes_acked)
229 unsigned long iflags;
230 struct cor_qos_queue *q = nb->queue;
231 __u64 data_intransit;
232 __u64 cwin;
234 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
236 data_intransit = atomic64_read(&(nb->nbcongwin.data_intransit));
238 cwin = cor_nbcongwin_update_cwin(nb, data_intransit, bytes_acked);
240 BUG_ON(bytes_acked > data_intransit);
241 atomic64_sub(bytes_acked, &(nb->nbcongwin.data_intransit));
242 data_intransit -= bytes_acked;
244 if (data_intransit >= cwin >> NBCONGWIN_SHIFT)
245 goto out_sendnok;
247 spin_lock(&(q->qlock));
248 if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
249 if (nb->conns_waiting.cnt == 0) {
250 nb->rb.in_queue = RB_INQUEUE_FALSE;
251 } else {
252 _cor_qos_enqueue(q, &(nb->rb), ns_to_ktime(0),
253 QOS_CALLER_NEIGHBOR, 1);
256 spin_unlock(&(q->qlock));
259 out_sendnok:
260 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
263 void cor_nbcongwin_data_sent(struct cor_neighbor *nb, __u32 bytes_sent)
265 atomic64_add(bytes_sent, &(nb->nbcongwin.data_intransit));
268 int cor_nbcongwin_send_allowed(struct cor_neighbor *nb)
270 unsigned long iflags;
271 int ret = 1;
272 struct cor_qos_queue *q = nb->queue;
273 int krefput_queue = 0;
275 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
276 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
277 return 1;
279 spin_lock_irqsave(&(nb->nbcongwin.lock), iflags);
281 if (atomic64_read(&(nb->nbcongwin.data_intransit)) <=
282 atomic64_read(&(nb->nbcongwin.cwin)) >> NBCONGWIN_SHIFT)
283 goto out_ok;
285 ret = 0;
287 spin_lock(&(q->qlock));
288 if (nb->rb.in_queue == RB_INQUEUE_FALSE) {
289 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
290 } else if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
291 list_del(&(nb->rb.lh));
292 kref_put(&(nb->ref), cor_kreffree_bug);
293 nb->rb.in_queue = RB_INQUEUE_NBCONGWIN;
294 BUG_ON(q->numconns < nb->conns_waiting.cnt);
295 q->numconns -= nb->conns_waiting.cnt;
296 q->priority_sum -= nb->conns_waiting.priority_sum;
297 krefput_queue = 1;
299 cor_qos_queue_set_congstatus(q);
300 } else if (nb->rb.in_queue == RB_INQUEUE_NBCONGWIN) {
301 } else {
302 BUG();
304 spin_unlock(&(q->qlock));
306 if (krefput_queue != 0)
307 kref_put(&(q->ref), cor_free_qos);
309 out_ok:
310 spin_unlock_irqrestore(&(nb->nbcongwin.lock), iflags);
312 return ret;
315 #endif
317 static void _cor_resume_conns_accountbusytime(struct cor_conn *trgt_out_l,
318 __u32 priority, __u32 burstprio,
319 unsigned long jiffies_nb_lastduration)
322 unsigned long jiffies_tmp = jiffies;
323 __u64 jiffies_last_idle_mul = (1LL << JIFFIES_LAST_IDLE_SHIFT);
324 __u64 burstfactor;
325 __u64 jiffies_shifted_busy;
327 BUG_ON(burstprio < priority);
329 burstfactor = div_u64(1024LL * (__u64) burstprio, priority);
330 BUG_ON(burstfactor < 1024);
331 burstfactor = 1024 + (burstfactor - 1024) * 2;
333 jiffies_shifted_busy = (jiffies_nb_lastduration * burstfactor *
334 jiffies_last_idle_mul) / 1024;
336 BUG_ON(BURSTPRIO_MAXIDLETIME_SECS >
337 (1 << 30) / (HZ*jiffies_last_idle_mul));
339 if (unlikely(jiffies_shifted_busy > HZ * BURSTPRIO_MAXIDLETIME_SECS *
340 jiffies_last_idle_mul))
341 trgt_out_l->target.out.jiffies_idle_since =
342 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
343 else
344 trgt_out_l->target.out.jiffies_idle_since +=
345 jiffies_shifted_busy;
347 if (unlikely(time_before(jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT,
348 trgt_out_l->target.out.jiffies_idle_since)))
349 trgt_out_l->target.out.jiffies_idle_since =
350 jiffies_tmp << JIFFIES_LAST_IDLE_SHIFT;
353 unsigned long cor_get_conn_idletime(struct cor_conn *trgt_out_l)
355 unsigned long jiffies_shifted = jiffies << JIFFIES_LAST_IDLE_SHIFT;
356 __u32 burst_maxidle_hz_shifted = (BURSTPRIO_MAXIDLETIME_SECS*HZ) <<
357 JIFFIES_LAST_IDLE_SHIFT;
358 unsigned long idletime_hz_shifted;
360 if (unlikely(time_before(jiffies_shifted,
361 trgt_out_l->target.out.jiffies_idle_since))) {
362 idletime_hz_shifted = 0;
363 trgt_out_l->target.out.jiffies_idle_since = jiffies_shifted -
364 burst_maxidle_hz_shifted;
365 } else {
366 idletime_hz_shifted = jiffies_shifted -
367 trgt_out_l->target.out.jiffies_idle_since;
369 if (unlikely(idletime_hz_shifted > burst_maxidle_hz_shifted)) {
370 idletime_hz_shifted = burst_maxidle_hz_shifted;
371 trgt_out_l->target.out.jiffies_idle_since =
372 jiffies_shifted -
373 burst_maxidle_hz_shifted;
377 return idletime_hz_shifted;
380 static __u32 _cor_resume_conns_burstprio(struct cor_conn *trgt_out_l,
381 __u32 priority)
383 unsigned long idletime_hz_shifted = cor_get_conn_idletime(trgt_out_l);
384 __u32 idletime_msecs = jiffies_to_msecs(idletime_hz_shifted >>
385 JIFFIES_LAST_IDLE_SHIFT);
386 __u32 burstfactor;
387 __u64 newprio;
389 BUG_ON(idletime_msecs > BURSTPRIO_MAXIDLETIME_SECS*1000);
390 BUG_ON(BURSTPRIO_MAXIDLETIME_SECS*1000LL > U32_MAX / 1024);
392 burstfactor = (1024 * idletime_msecs) /
393 (BURSTPRIO_MAXIDLETIME_SECS * 1000);
395 if (trgt_out_l->is_highlatency != 0)
396 newprio = (((__u64) priority) * (1024 + 1 * burstfactor)) /
397 1024;
398 else
399 newprio = (((__u64) priority) * (1024 + 2 * burstfactor)) /
400 1024;
402 BUG_ON(newprio > U32_MAX);
403 return (__u32) newprio;
406 static __u64 _cor_resume_conns_maxsend(struct cor_qos_queue *q,
407 struct cor_conn *trgt_out_l, __u32 newpriority,
408 int *maxsend_forcedelay)
410 unsigned long iflags;
412 struct cor_neighbor *nb = trgt_out_l->target.out.nb;
413 __u32 oldpriority = trgt_out_l->target.out.rb_priority;
414 __u64 priority_sum;
415 __u32 numconns;
416 __u64 bytes_per_round;
418 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
419 spin_lock(&(q->qlock));
421 if (unlikely(unlikely(trgt_out_l->target.out.rb.in_queue !=
422 RB_INQUEUE_TRUE) ||
423 unlikely(nb->rb.in_queue != RB_INQUEUE_TRUE))) {
424 spin_unlock(&(q->qlock));
425 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
427 return 1024LL;
430 BUG_ON(nb->conns_waiting.priority_sum < oldpriority);
431 BUG_ON(q->priority_sum < oldpriority);
432 nb->conns_waiting.priority_sum -= oldpriority;
433 q->priority_sum -= oldpriority;
435 BUG_ON(nb->conns_waiting.priority_sum + newpriority <
436 nb->conns_waiting.priority_sum);
437 BUG_ON(q->priority_sum + newpriority < q->priority_sum);
438 nb->conns_waiting.priority_sum += newpriority;
439 q->priority_sum += newpriority;
441 priority_sum = q->priority_sum;
442 numconns = q->numconns;
444 spin_unlock(&(q->qlock));
445 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
447 trgt_out_l->target.out.rb_priority = newpriority;
449 if (numconns <= 4) {
450 *maxsend_forcedelay = 1;
451 bytes_per_round = 2048LL;
452 } else {
453 *maxsend_forcedelay = 0;
454 bytes_per_round = 1024LL;
457 if (trgt_out_l->is_highlatency != 0)
458 bytes_per_round += bytes_per_round/8;
460 return div_u64(bytes_per_round * ((__u64) newpriority) *
461 ((__u64) numconns), priority_sum);
464 static int _cor_resume_neighbors_nextpass(
465 struct cor_neighbor *nb_waitingconnslocked)
467 BUG_ON(list_empty(&(nb_waitingconnslocked->conns_waiting.lh)) == 0);
469 if (list_empty(&(nb_waitingconnslocked->conns_waiting.lh_nextpass))) {
470 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt != 0);
471 return 1;
474 BUG_ON(nb_waitingconnslocked->conns_waiting.cnt == 0);
476 cor_swap_list_items(&(nb_waitingconnslocked->conns_waiting.lh),
477 &(nb_waitingconnslocked->conns_waiting.lh_nextpass));
479 return 0;
482 static int _cor_resume_neighbors(struct cor_qos_queue *q,
483 struct cor_neighbor *nb, unsigned long jiffies_nb_lastduration,
484 int *progress)
486 unsigned long iflags;
488 while (1) {
489 __u32 priority;
490 __u32 burstprio;
491 __u32 maxsend;
492 int maxsend_forcedelay = 0;
494 int rc2;
495 __u32 sent2 = 0;
497 struct cor_conn *cn = 0;
498 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
499 if (list_empty(&(nb->conns_waiting.lh)) != 0) {
500 int done = _cor_resume_neighbors_nextpass(nb);
501 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
502 iflags);
503 return done ? QOS_RESUME_DONE : QOS_RESUME_NEXTNEIGHBOR;
505 BUG_ON(nb->conns_waiting.cnt == 0);
507 cn = container_of(nb->conns_waiting.lh.next, struct cor_conn,
508 target.out.rb.lh);
509 BUG_ON(cn->targettype != TARGET_OUT);
510 BUG_ON(cn->target.out.rb.lh.prev != &(nb->conns_waiting.lh));
511 BUG_ON((cn->target.out.rb.lh.next == &(nb->conns_waiting.lh)) &&
512 (nb->conns_waiting.lh.prev !=
513 &(cn->target.out.rb.lh)));
514 list_del(&(cn->target.out.rb.lh));
515 list_add_tail(&(cn->target.out.rb.lh),
516 &(nb->conns_waiting.lh_nextpass));
517 kref_get(&(cn->ref));
518 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
521 priority = cor_refresh_conn_priority(cn, 0);
523 spin_lock_bh(&(cn->rcv_lock));
525 if (unlikely(cn->targettype != TARGET_OUT)) {
526 spin_unlock_bh(&(cn->rcv_lock));
527 continue;
530 burstprio = _cor_resume_conns_burstprio(cn, priority);
532 maxsend = _cor_resume_conns_maxsend(q, cn, burstprio,
533 &maxsend_forcedelay);
534 if (cn->target.out.maxsend_extra >= maxsend)
535 maxsend_forcedelay = 0;
536 maxsend += cn->target.out.maxsend_extra;
537 if (unlikely(maxsend > U32_MAX))
538 maxsend = U32_MAX;
539 if (unlikely(maxsend >= 65536))
540 maxsend_forcedelay = 0;
542 rc2 = _cor_flush_out(cn, maxsend, &sent2, 1,
543 maxsend_forcedelay);
545 if (rc2 == RC_FLUSH_CONN_OUT_OK ||
546 rc2 == RC_FLUSH_CONN_OUT_NBNOTACTIVE) {
547 cn->target.out.maxsend_extra = 0;
548 cor_qos_remove_conn(cn);
549 } else if (sent2 == 0 && (rc2 == RC_FLUSH_CONN_OUT_CONG ||
550 rc2 == RC_FLUSH_CONN_OUT_OOM)) {
551 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
552 if (likely(cn->target.out.rb.in_queue !=
553 RB_INQUEUE_FALSE)) {
554 list_del(&(cn->target.out.rb.lh));
555 list_add(&(cn->target.out.rb.lh),
556 &(nb->conns_waiting.lh));
558 spin_unlock_irqrestore(&(nb->conns_waiting.lock),
559 iflags);
560 } else if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
561 rc2 == RC_FLUSH_CONN_OUT_OOM) {
562 cn->target.out.maxsend_extra = 0;
563 } else if (likely(rc2 == RC_FLUSH_CONN_OUT_MAXSENT)) {
564 if (unlikely(maxsend - sent2 > 65535))
565 cn->target.out.maxsend_extra = 65535;
566 else
567 cn->target.out.maxsend_extra = maxsend - sent2;
570 if (sent2 != 0)
571 _cor_resume_conns_accountbusytime(cn, priority,
572 burstprio, jiffies_nb_lastduration);
574 spin_unlock_bh(&(cn->rcv_lock));
576 if (sent2 != 0) {
577 *progress = 1;
578 cor_wake_sender(cn);
581 kref_put(&(cn->ref), cor_free_conn);
583 if (rc2 == RC_FLUSH_CONN_OUT_CONG ||
584 rc2 == RC_FLUSH_CONN_OUT_OOM) {
585 return QOS_RESUME_CONG;
590 static struct cor_neighbor *cor_resume_neighbors_peeknextnb(
591 struct cor_qos_queue *q, unsigned long *jiffies_nb_lastduration)
593 unsigned long iflags;
595 struct cor_neighbor *nb;
597 spin_lock_irqsave(&(q->qlock), iflags);
599 if (list_empty(&(q->neighbors_waiting))) {
600 if (list_empty(&(q->neighbors_waiting_nextpass))) {
601 BUG_ON(q->numconns != 0);
602 spin_unlock_irqrestore(&(q->qlock), iflags);
604 return 0;
605 } else {
606 unsigned long jiffies_tmp = jiffies;
607 cor_swap_list_items(&(q->neighbors_waiting),
608 &(q->neighbors_waiting_nextpass));
610 WARN_ONCE(time_before(jiffies_tmp,
611 q->jiffies_nb_pass_start),
612 "cor_resume_neighbors_peeknextnb: jiffies after jiffies_nb_pass_start (this is only a performance issue)");
614 q->jiffies_nb_lastduration = jiffies -
615 q->jiffies_nb_pass_start;
616 q->jiffies_nb_pass_start = jiffies_tmp;
620 *jiffies_nb_lastduration = q->jiffies_nb_lastduration;
623 BUG_ON(q->numconns == 0);
624 BUG_ON(list_empty(&(q->neighbors_waiting)));
626 nb = container_of(q->neighbors_waiting.next, struct cor_neighbor,
627 rb.lh);
629 BUG_ON(nb->rb.in_queue != RB_INQUEUE_TRUE);
630 BUG_ON(nb->rb.lh.prev != &(q->neighbors_waiting));
631 BUG_ON((nb->rb.lh.next == &(q->neighbors_waiting)) &&
632 (q->neighbors_waiting.prev != &(nb->rb.lh)));
634 kref_get(&(nb->ref));
636 spin_unlock_irqrestore(&(q->qlock), iflags);
638 return nb;
641 static int cor_resume_neighbors(struct cor_qos_queue *q, int *sent)
643 unsigned long iflags;
644 int rc;
646 unsigned long jiffies_nb_lastduration;
647 struct cor_neighbor *nb = cor_resume_neighbors_peeknextnb(q,
648 &jiffies_nb_lastduration);
650 if (nb == 0)
651 return QOS_RESUME_DONE;
653 atomic_set(&(nb->cmsg_delay_conndata), 1);
655 rc = _cor_resume_neighbors(q, nb, jiffies_nb_lastduration, sent);
656 if (rc == QOS_RESUME_CONG) {
657 kref_put(&(nb->ref), cor_neighbor_free);
658 return QOS_RESUME_CONG;
660 BUG_ON(rc != QOS_RESUME_DONE && rc != QOS_RESUME_NEXTNEIGHBOR);
662 atomic_set(&(nb->cmsg_delay_conndata), 0);
663 spin_lock_bh(&(nb->cmsg_lock));
664 cor_schedule_controlmsg_timer(nb);
665 spin_unlock_bh(&(nb->cmsg_lock));
667 spin_lock_irqsave(&(q->qlock), iflags);
668 if (likely(nb->rb.in_queue == RB_INQUEUE_TRUE)) {
669 if (nb->conns_waiting.cnt == 0) {
670 nb->rb.in_queue = RB_INQUEUE_FALSE;
671 list_del(&(nb->rb.lh));
672 kref_put(&(nb->ref), cor_kreffree_bug);
673 } else {
674 list_del(&(nb->rb.lh));
675 list_add_tail(&(nb->rb.lh),
676 &(q->neighbors_waiting_nextpass));
679 spin_unlock_irqrestore(&(q->qlock), iflags);
681 kref_put(&(nb->ref), cor_neighbor_free);
683 return QOS_RESUME_NEXTNEIGHBOR;
686 static int __cor_qos_resume(struct cor_qos_queue *q, int caller, int *sent)
688 unsigned long iflags;
689 int rc = QOS_RESUME_DONE;
690 struct list_head *lh;
692 spin_lock_irqsave(&(q->qlock), iflags);
694 if (caller == QOS_CALLER_KPACKET)
695 lh = &(q->kpackets_waiting);
696 else if (caller == QOS_CALLER_CONN_RETRANS)
697 lh = &(q->conn_retrans_waiting);
698 else if (caller == QOS_CALLER_ANNOUNCE)
699 lh = &(q->announce_waiting);
700 else
701 BUG();
703 while (list_empty(lh) == 0) {
704 struct cor_resume_block *rb = container_of(lh->next,
705 struct cor_resume_block, lh);
706 ktime_t cmsg_send_start;
707 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
708 rb->in_queue = RB_INQUEUE_FALSE;
709 list_del(&(rb->lh));
711 if (caller == QOS_CALLER_KPACKET)
712 cmsg_send_start = container_of(rb, struct cor_neighbor,
713 rb_kp)->cmsg_send_start;
715 spin_unlock_irqrestore(&(q->qlock), iflags);
716 if (caller == QOS_CALLER_KPACKET) {
717 rc = cor_send_messages(container_of(rb,
718 struct cor_neighbor, rb_kp),
719 cmsg_send_start, sent);
720 } else if (caller == QOS_CALLER_CONN_RETRANS) {
721 rc = cor_send_retrans(container_of(rb,
722 struct cor_neighbor, rb_cr), sent);
723 } else if (caller == QOS_CALLER_ANNOUNCE) {
724 rc = _cor_send_announce(container_of(rb,
725 struct cor_announce_data, rb), 1, sent);
726 } else {
727 BUG();
729 spin_lock_irqsave(&(q->qlock), iflags);
731 if (rc != QOS_RESUME_DONE && caller == QOS_CALLER_KPACKET)
732 container_of(rb, struct cor_neighbor, rb_kp
733 )->cmsg_send_start = cmsg_send_start;
735 if (rc != QOS_RESUME_DONE && rb->in_queue == RB_INQUEUE_FALSE) {
736 rb->in_queue = RB_INQUEUE_TRUE;
737 list_add(&(rb->lh), lh);
738 break;
741 if (caller == QOS_CALLER_KPACKET) {
742 kref_put(&(container_of(rb, struct cor_neighbor,
743 rb_kp)->ref), cor_neighbor_free);
744 } else if (caller == QOS_CALLER_CONN_RETRANS) {
745 kref_put(&(container_of(rb, struct cor_neighbor,
746 rb_cr)->ref), cor_neighbor_free);
747 } else if (caller == QOS_CALLER_ANNOUNCE) {
748 kref_put(&(container_of(rb,
749 struct cor_announce_data, rb)->ref),
750 cor_announce_data_free);
751 } else {
752 BUG();
755 kref_put(&(q->ref), cor_kreffree_bug);
758 spin_unlock_irqrestore(&(q->qlock), iflags);
760 return rc;
763 static int _cor_qos_resume(struct cor_qos_queue *q, int *sent)
765 unsigned long iflags;
766 int i = QOS_CALLER_KPACKET;
767 int rc;
769 spin_lock_irqsave(&(q->qlock), iflags);
771 while (1) {
772 if (q->dev == 0) {
773 rc = QOS_RESUME_EXIT;
774 break;
777 if (i == QOS_CALLER_KPACKET &&
778 list_empty(&(q->kpackets_waiting))) {
779 i = QOS_CALLER_CONN_RETRANS;
780 continue;
781 } else if (i == QOS_CALLER_CONN_RETRANS &&
782 list_empty(&(q->conn_retrans_waiting))) {
783 i = QOS_CALLER_ANNOUNCE;
784 continue;
785 } else if (i == QOS_CALLER_ANNOUNCE &&
786 list_empty(&(q->announce_waiting))) {
787 i = QOS_CALLER_NEIGHBOR;
788 continue;
789 } else if (i == QOS_CALLER_NEIGHBOR &&
790 list_empty(&(q->neighbors_waiting)) &&
791 list_empty(&(q->neighbors_waiting_nextpass))) {
792 rc = QOS_RESUME_DONE;
793 break;
796 spin_unlock_irqrestore(&(q->qlock), iflags);
798 if (i == QOS_CALLER_NEIGHBOR) {
799 rc = cor_resume_neighbors(q, sent);
800 } else {
801 rc = __cor_qos_resume(q, i, sent);
804 spin_lock_irqsave(&(q->qlock), iflags);
806 if (rc == QOS_RESUME_CONG)
807 break;
809 i = QOS_CALLER_KPACKET;
812 if (rc == QOS_RESUME_DONE) {
813 BUG_ON(!list_empty(&(q->kpackets_waiting)));
814 BUG_ON(!list_empty(&(q->conn_retrans_waiting)));
815 BUG_ON(!list_empty(&(q->announce_waiting)));
816 BUG_ON(!list_empty(&(q->neighbors_waiting)));
817 BUG_ON(!list_empty(&(q->neighbors_waiting_nextpass)));
819 atomic_set(&(q->qos_resume_scheduled), 0);
822 cor_qos_queue_set_congstatus(q);
824 if (q->dev == 0)
825 rc = QOS_RESUME_EXIT;
827 spin_unlock_irqrestore(&(q->qlock), iflags);
829 return rc;
832 int cor_qos_resume_threadfunc(void *data)
834 struct cor_qos_queue *q = (struct cor_qos_queue *) data;
836 while (1) {
837 int sent = 0;
838 int rc;
840 rc = _cor_qos_resume(q, &sent);
842 if (rc == QOS_RESUME_DONE) {
843 wait_event(q->qos_resume_wq,
844 atomic_read(&(q->qos_resume_scheduled))
845 != 0);
846 } else if (rc == QOS_RESUME_CONG) {
847 unsigned long jiffies_tmp = jiffies;
848 unsigned long delay_ms = 0;
850 if (sent)
851 q->jiffies_lastprogress = jiffies_tmp;
852 delay_ms = (jiffies_to_msecs(jiffies_tmp -
853 q->jiffies_lastprogress) + 8) / 4;
854 if (delay_ms < 2) {
855 delay_ms = 2;
856 } else if (delay_ms > 20) {
857 delay_ms = 20;
860 msleep(delay_ms);
861 } else if (rc == QOS_RESUME_EXIT) {
862 return 0;
863 } else {
864 BUG();
869 static inline int cor_qos_queue_is_destroyed(struct cor_qos_queue *q_locked)
871 return q_locked->dev == 0;
874 struct cor_qos_queue *cor_get_queue(struct net_device *dev)
876 struct cor_qos_queue *ret = 0;
877 struct list_head *curr;
879 spin_lock_bh(&cor_queues_lock);
880 curr = cor_queues.next;
881 while (curr != (&cor_queues)) {
882 struct cor_qos_queue *q = container_of(curr,
883 struct cor_qos_queue, queue_list);
884 if (q->dev == dev) {
885 ret = q;
886 kref_get(&(ret->ref));
887 break;
889 curr = curr->next;
891 spin_unlock_bh(&cor_queues_lock);
892 return ret;
895 static void cor_qos_waitexit(struct work_struct *work)
897 spin_lock_bh(&cor_queues_lock);
898 while (!list_empty(&cor_queues_waitexit)) {
899 struct cor_qos_queue *q = container_of(cor_queues_waitexit.next,
900 struct cor_qos_queue, queue_list);
901 list_del(&(q->queue_list));
903 spin_unlock_bh(&cor_queues_lock);
905 kthread_stop(q->qos_resume_thread);
906 put_task_struct(q->qos_resume_thread);
907 kref_put(&(q->ref), cor_free_qos);
909 spin_lock_bh(&cor_queues_lock);
911 spin_unlock_bh(&cor_queues_lock);
914 static void _cor_destroy_queue_kpackets(struct cor_qos_queue *q)
916 while (list_empty(&(q->kpackets_waiting)) == 0) {
917 struct list_head *curr = q->kpackets_waiting.next;
918 struct cor_resume_block *rb = container_of(curr,
919 struct cor_resume_block, lh);
920 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
921 rb->in_queue = RB_INQUEUE_FALSE;
922 list_del(curr);
924 kref_put(&(container_of(rb, struct cor_neighbor, rb_kp)->ref),
925 cor_neighbor_free);
926 kref_put(&(q->ref), cor_kreffree_bug);
930 static void _cor_destroy_queue_conn_retrans(struct cor_qos_queue *q)
932 while (list_empty(&(q->conn_retrans_waiting)) == 0) {
933 struct list_head *curr = q->conn_retrans_waiting.next;
934 struct cor_resume_block *rb = container_of(curr,
935 struct cor_resume_block, lh);
936 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
937 rb->in_queue = RB_INQUEUE_FALSE;
938 list_del(curr);
940 kref_put(&(container_of(rb, struct cor_neighbor, rb_cr)->ref),
941 cor_neighbor_free);
942 kref_put(&(q->ref), cor_kreffree_bug);
946 static void _cor_destroy_queue_announce(struct cor_qos_queue *q)
948 while (list_empty(&(q->announce_waiting)) == 0) {
949 struct list_head *curr = q->announce_waiting.next;
950 struct cor_resume_block *rb = container_of(curr,
951 struct cor_resume_block, lh);
952 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
953 rb->in_queue = RB_INQUEUE_FALSE;
954 list_del(curr);
956 kref_put(&(container_of(rb, struct cor_announce_data, rb)->ref),
957 cor_announce_data_free);
958 kref_put(&(q->ref), cor_kreffree_bug);
962 static void _cor_destroy_queue_neighbor(struct cor_qos_queue *q,
963 struct list_head *lh)
965 while (list_empty(lh) == 0) {
966 struct list_head *curr = lh->next;
967 struct cor_resume_block *rb = container_of(curr,
968 struct cor_resume_block, lh);
969 BUG_ON(rb->in_queue != RB_INQUEUE_TRUE);
970 rb->in_queue = RB_INQUEUE_FALSE;
971 list_del(curr);
973 kref_put(&(container_of(rb, struct cor_neighbor, rb)->ref),
974 cor_neighbor_free);
975 kref_put(&(q->ref), cor_kreffree_bug);
979 static struct cor_qos_queue *cor_unlink_queue(struct net_device *dev)
981 struct cor_qos_queue *ret = 0;
982 struct list_head *curr;
984 spin_lock_bh(&cor_queues_lock);
985 curr = cor_queues.next;
986 while (curr != (&cor_queues)) {
987 struct cor_qos_queue *q = container_of(curr,
988 struct cor_qos_queue, queue_list);
989 if (dev == 0 || q->dev == dev) {
990 ret = q;
991 kref_get(&(ret->ref));
993 list_del(&(q->queue_list));
994 kref_put(&(q->ref), cor_kreffree_bug);
995 break;
997 curr = curr->next;
999 spin_unlock_bh(&cor_queues_lock);
1000 return ret;
1003 int cor_destroy_queue(struct net_device *dev)
1005 int rc = 1;
1006 unsigned long iflags;
1008 while (1) {
1009 struct cor_qos_queue *q = cor_unlink_queue(dev);
1011 if (q == 0)
1012 break;
1014 rc = 0;
1016 spin_lock_irqsave(&(q->qlock), iflags);
1017 if (q->dev != 0) {
1018 dev_put(q->dev);
1019 q->dev = 0;
1021 _cor_destroy_queue_kpackets(q);
1022 _cor_destroy_queue_conn_retrans(q);
1023 _cor_destroy_queue_announce(q);
1024 _cor_destroy_queue_neighbor(q, &(q->neighbors_waiting));
1025 _cor_destroy_queue_neighbor(q, &(q->neighbors_waiting_nextpass));
1027 spin_unlock_irqrestore(&(q->qlock), iflags);
1029 cor_schedule_qos_resume(q);
1031 spin_lock_bh(&cor_queues_lock);
1032 list_add(&(q->queue_list), &cor_queues_waitexit);
1033 spin_unlock_bh(&cor_queues_lock);
1035 schedule_work(&cor_qos_waitexit_work);
1038 return rc;
1041 int cor_create_queue(struct net_device *dev)
1043 struct cor_qos_queue *q = kmalloc(sizeof(struct cor_qos_queue),
1044 GFP_KERNEL);
1046 if (q == 0) {
1047 printk(KERN_ERR "cor: unable to allocate memory for device "
1048 "queue, not enabling device");
1049 return 1;
1052 memset(q, 0, sizeof(struct cor_qos_queue));
1054 spin_lock_init(&(q->qlock));
1056 kref_init(&(q->ref));
1058 q->dev = dev;
1059 dev_hold(dev);
1061 atomic_set(&(q->qos_resume_scheduled), 0);
1063 init_waitqueue_head(&(q->qos_resume_wq));
1065 INIT_LIST_HEAD(&(q->kpackets_waiting));
1066 INIT_LIST_HEAD(&(q->conn_retrans_waiting));
1067 INIT_LIST_HEAD(&(q->announce_waiting));
1068 INIT_LIST_HEAD(&(q->neighbors_waiting));
1069 INIT_LIST_HEAD(&(q->neighbors_waiting_nextpass));
1071 atomic_set(&(q->cong_status), 0);
1073 q->qos_resume_thread = kthread_create(cor_qos_resume_threadfunc,
1074 q, "cor_qos_resume");
1075 if (q->qos_resume_thread == 0) {
1076 printk(KERN_ERR "cor: unable to start qos_resume thread");
1078 if (q->dev != 0) {
1079 dev_put(q->dev);
1080 q->dev = 0;
1083 kref_put(&(q->ref), cor_free_qos);
1085 return 1;
1087 get_task_struct(q->qos_resume_thread);
1088 wake_up_process(q->qos_resume_thread);
1090 spin_lock_bh(&cor_queues_lock);
1091 list_add(&(q->queue_list), &cor_queues);
1092 spin_unlock_bh(&cor_queues_lock);
1094 return 0;
1097 static void cor_qos_queue_set_congstatus(struct cor_qos_queue *q_locked)
1099 __u32 newstatus;
1101 if (time_before(q_locked->jiffies_lastdrop, jiffies - HZ/50)) {
1102 newstatus = CONGSTATUS_NONE;
1103 } else if (list_empty(&(q_locked->kpackets_waiting)) == 0) {
1104 newstatus = CONGSTATUS_KPACKETS;
1105 } else if (list_empty(&(q_locked->conn_retrans_waiting)) == 0) {
1106 newstatus = CONGSTATUS_RETRANS;
1107 } else if (list_empty(&(q_locked->announce_waiting)) == 0) {
1108 newstatus = CONGSTATUS_ANNOUNCE;
1109 } else if (list_empty(&(q_locked->neighbors_waiting)) == 0 ||
1110 list_empty(&(q_locked->neighbors_waiting_nextpass)) ==
1111 0) {
1112 newstatus = CONGSTATUS_CONNDATA;
1113 } else {
1114 newstatus = CONGSTATUS_NONE;
1117 atomic_set(&(q_locked->cong_status), newstatus);
1120 void cor_qos_set_lastdrop(struct cor_qos_queue *q)
1122 unsigned long iflags;
1124 spin_lock_irqsave(&(q->qlock), iflags);
1125 q->jiffies_lastdrop = jiffies;
1126 cor_qos_queue_set_congstatus(q);
1127 spin_unlock_irqrestore(&(q->qlock), iflags);
1131 * if caller == QOS_CALLER_NEIGHBOR, nb->conns_waiting.lock must be held by
1132 * caller
1134 static void _cor_qos_enqueue(struct cor_qos_queue *q,
1135 struct cor_resume_block *rb, ktime_t cmsg_send_start,
1136 int caller, int from_nbcongwin_resume)
1138 int queues_empty;
1140 if (rb->in_queue == RB_INQUEUE_TRUE) {
1141 BUG_ON(caller == QOS_CALLER_NEIGHBOR);
1143 if (caller == QOS_CALLER_KPACKET) {
1144 struct cor_neighbor *nb = container_of(rb,
1145 struct cor_neighbor, rb_kp);
1146 if (ktime_before(cmsg_send_start, nb->cmsg_send_start))
1147 nb->cmsg_send_start = cmsg_send_start;
1149 return;
1150 } else if (rb->in_queue == RB_INQUEUE_NBCONGWIN &&
1151 from_nbcongwin_resume == 0) {
1152 return;
1155 if (unlikely(cor_qos_queue_is_destroyed(q)))
1156 return;
1158 queues_empty = list_empty(&(q->kpackets_waiting)) &&
1159 list_empty(&(q->conn_retrans_waiting)) &&
1160 list_empty(&(q->announce_waiting)) &&
1161 list_empty(&(q->neighbors_waiting)) &&
1162 list_empty(&(q->neighbors_waiting_nextpass));
1164 BUG_ON(!queues_empty && atomic_read(&(q->qos_resume_scheduled)) == 0);
1166 rb->in_queue = RB_INQUEUE_TRUE;
1168 if (caller == QOS_CALLER_KPACKET) {
1169 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
1170 rb_kp);
1171 nb->cmsg_send_start = cmsg_send_start;
1172 list_add_tail(&(rb->lh), &(q->kpackets_waiting));
1173 kref_get(&(nb->ref));
1174 } else if (caller == QOS_CALLER_CONN_RETRANS) {
1175 list_add_tail(&(rb->lh) , &(q->conn_retrans_waiting));
1176 kref_get(&(container_of(rb, struct cor_neighbor, rb_cr)->ref));
1177 } else if (caller == QOS_CALLER_ANNOUNCE) {
1178 list_add_tail(&(rb->lh), &(q->announce_waiting));
1179 kref_get(&(container_of(rb, struct cor_announce_data, rb)->ref));
1180 } else if (caller == QOS_CALLER_NEIGHBOR) {
1181 struct cor_neighbor *nb = container_of(rb, struct cor_neighbor,
1182 rb);
1183 list_add_tail(&(rb->lh), &(q->neighbors_waiting_nextpass));
1184 kref_get(&(nb->ref));
1185 BUG_ON(nb->conns_waiting.cnt == 0);
1186 q->numconns += nb->conns_waiting.cnt;
1187 q->priority_sum += nb->conns_waiting.priority_sum;
1188 q->jiffies_nb_lastduration = 0;
1189 q->jiffies_nb_pass_start = jiffies;
1190 } else {
1191 BUG();
1193 kref_get(&(q->ref));
1195 cor_schedule_qos_resume(q);
1197 cor_qos_queue_set_congstatus(q);
1200 void cor_qos_enqueue(struct cor_qos_queue *q, struct cor_resume_block *rb,
1201 ktime_t cmsg_send_start, int caller)
1203 unsigned long iflags;
1205 spin_lock_irqsave(&(q->qlock), iflags);
1206 _cor_qos_enqueue(q, rb, cmsg_send_start, caller, 0);
1207 spin_unlock_irqrestore(&(q->qlock), iflags);
1210 void cor_qos_remove_conn(struct cor_conn *trgt_out_lx)
1212 unsigned long iflags;
1213 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
1214 struct cor_qos_queue *q = nb->queue;
1215 int sched_cmsg = 0;
1216 int krefput_nb = 0;
1218 BUG_ON(trgt_out_lx->targettype != TARGET_OUT);
1219 BUG_ON(q == 0);
1221 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1222 if (trgt_out_lx->target.out.rb.in_queue == RB_INQUEUE_FALSE) {
1223 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1224 return;
1226 spin_lock(&(q->qlock));
1228 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_FALSE;
1229 list_del(&(trgt_out_lx->target.out.rb.lh));
1230 BUG_ON(nb->conns_waiting.cnt == 0);
1231 nb->conns_waiting.cnt--;
1232 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1233 BUG_ON(q->numconns == 0);
1234 q->numconns--;
1237 BUG_ON(nb->conns_waiting.priority_sum <
1238 trgt_out_lx->target.out.rb_priority);
1239 BUG_ON(q->priority_sum < trgt_out_lx->target.out.rb_priority);
1240 nb->conns_waiting.priority_sum -=
1241 trgt_out_lx->target.out.rb_priority;
1242 q->priority_sum -= trgt_out_lx->target.out.rb_priority;
1243 trgt_out_lx->target.out.rb_priority = 0;
1245 if (list_empty(&(nb->conns_waiting.lh)) &&
1246 list_empty(&(nb->conns_waiting.lh_nextpass))) {
1247 BUG_ON(nb->conns_waiting.priority_sum != 0);
1248 BUG_ON(nb->conns_waiting.cnt != 0);
1249 } else {
1250 BUG_ON(nb->conns_waiting.cnt == 0);
1253 if (list_empty(&(nb->conns_waiting.lh)) &&
1254 list_empty(&(nb->conns_waiting.lh_nextpass)) &&
1255 nb->rb.in_queue == RB_INQUEUE_TRUE) {
1256 nb->rb.in_queue = RB_INQUEUE_FALSE;
1257 list_del(&(nb->rb.lh));
1258 if (atomic_read(&(nb->cmsg_delay_conndata)) != 0) {
1259 atomic_set(&(nb->cmsg_delay_conndata), 0);
1260 sched_cmsg = 1;
1263 krefput_nb = 1;
1265 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1266 list_empty(&(q->neighbors_waiting_nextpass)) &&
1267 q->numconns != 0);
1268 BUG_ON(list_empty(&(q->neighbors_waiting)) &&
1269 list_empty(&(q->neighbors_waiting_nextpass)) &&
1270 q->priority_sum != 0);
1272 cor_qos_queue_set_congstatus(q);
1275 spin_unlock(&(q->qlock));
1276 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1278 if (sched_cmsg) {
1279 spin_lock_bh(&(nb->cmsg_lock));
1280 cor_schedule_controlmsg_timer(nb);
1281 spin_unlock_bh(&(nb->cmsg_lock));
1284 kref_put(&(trgt_out_lx->ref), cor_kreffree_bug);
1286 if (krefput_nb)
1287 kref_put(&(nb->ref), cor_neighbor_free);
1290 void cor_qos_enqueue_conn(struct cor_conn *trgt_out_lx)
1292 unsigned long iflags;
1293 struct cor_neighbor *nb = trgt_out_lx->target.out.nb;
1294 struct cor_qos_queue *q;
1296 BUG_ON(trgt_out_lx->data_buf.read_remaining == 0);
1298 spin_lock_irqsave(&(nb->conns_waiting.lock), iflags);
1300 if (trgt_out_lx->target.out.rb.in_queue != RB_INQUEUE_FALSE)
1301 goto out;
1303 trgt_out_lx->target.out.rb.in_queue = RB_INQUEUE_TRUE;
1304 list_add_tail(&(trgt_out_lx->target.out.rb.lh),
1305 &(nb->conns_waiting.lh));
1306 kref_get(&(trgt_out_lx->ref));
1307 nb->conns_waiting.cnt++;
1309 q = trgt_out_lx->target.out.nb->queue;
1310 spin_lock(&(q->qlock));
1311 if (nb->rb.in_queue == RB_INQUEUE_TRUE) {
1312 q->numconns++;
1313 } else {
1314 _cor_qos_enqueue(q, &(nb->rb), ns_to_ktime(0),
1315 QOS_CALLER_NEIGHBOR, 0);
1317 spin_unlock(&(q->qlock));
1319 out:
1320 spin_unlock_irqrestore(&(nb->conns_waiting.lock), iflags);
1323 static struct sk_buff *cor_create_packet(struct cor_neighbor *nb, int size,
1324 gfp_t alloc_flags)
1326 struct sk_buff *ret;
1328 ret = alloc_skb(size + LL_RESERVED_SPACE(nb->dev) +
1329 nb->dev->needed_tailroom, alloc_flags);
1330 if (unlikely(ret == 0))
1331 return 0;
1333 ret->protocol = htons(ETH_P_COR);
1334 ret->dev = nb->dev;
1336 skb_reserve(ret, LL_RESERVED_SPACE(nb->dev));
1337 if(unlikely(dev_hard_header(ret, nb->dev, ETH_P_COR, nb->mac,
1338 nb->dev->dev_addr, ret->len) < 0))
1339 return 0;
1340 skb_reset_network_header(ret);
1342 return ret;
1345 struct sk_buff *cor_create_packet_cmsg(struct cor_neighbor *nb, int size,
1346 gfp_t alloc_flags, __u64 seqno)
1348 struct sk_buff *ret;
1349 char *dest;
1351 ret = cor_create_packet(nb, size + 7, alloc_flags);
1352 if (unlikely(ret == 0))
1353 return 0;
1355 dest = skb_put(ret, 7);
1356 BUG_ON(dest == 0);
1358 dest[0] = PACKET_TYPE_CMSG;
1359 dest += 1;
1361 cor_put_u48(dest, seqno);
1362 dest += 6;
1364 return ret;
1367 struct sk_buff *cor_create_packet_conndata(struct cor_neighbor *nb, int size,
1368 gfp_t alloc_flags, __u32 conn_id, __u64 seqno,
1369 __u8 snd_delayed_lowbuf, __u8 flush)
1371 struct sk_buff *ret;
1372 char *dest;
1374 ret = cor_create_packet(nb, size + 11, alloc_flags);
1375 if (unlikely(ret == 0))
1376 return 0;
1378 dest = skb_put(ret, 11);
1379 BUG_ON(dest == 0);
1381 if (flush != 0) {
1382 if (snd_delayed_lowbuf != 0) {
1383 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH;
1384 } else {
1385 dest[0] = PACKET_TYPE_CONNDATA_FLUSH;
1387 } else {
1388 if (snd_delayed_lowbuf != 0) {
1389 dest[0] = PACKET_TYPE_CONNDATA_LOWBUFDELAYED;
1390 } else {
1391 dest[0] = PACKET_TYPE_CONNDATA;
1394 dest += 1;
1396 cor_put_u32(dest, conn_id);
1397 dest += 4;
1398 cor_put_u48(dest, seqno);
1399 dest += 6;
1401 return ret;
1405 static void cor_rcv_conndata(struct sk_buff *skb, int rcv_delayed_lowbuf,
1406 __u8 flush)
1408 struct cor_neighbor *nb = cor_get_neigh_by_mac(skb);
1410 __u32 conn_id;
1411 __u64 seqno;
1413 char *connid_p;
1414 char *seqno_p;
1416 /* __u8 rand; */
1418 if (unlikely(nb == 0))
1419 goto drop;
1421 connid_p = cor_pull_skb(skb, 4);
1422 if (unlikely(connid_p == 0))
1423 goto drop;
1425 seqno_p = cor_pull_skb(skb, 6);
1426 if (unlikely(seqno_p == 0))
1427 goto drop;
1429 conn_id = cor_parse_u32(connid_p);
1430 seqno = cor_parse_u48(seqno_p);
1432 /* get_random_bytes(&rand, 1);
1433 if (rand < 64)
1434 goto drop; */
1436 if (unlikely(skb->len <= 0))
1437 goto drop;
1439 cor_conn_rcv(nb, skb, 0, 0, conn_id, seqno, rcv_delayed_lowbuf, flush);
1441 if (0) {
1442 drop:
1443 kfree_skb(skb);
1446 if (nb != 0) {
1447 kref_put(&(nb->ref), cor_neighbor_free);
1451 static void cor_rcv_cmsg(struct sk_buff *skb)
1453 struct cor_neighbor *nb = cor_get_neigh_by_mac(skb);
1455 __u64 seqno;
1457 char *seqno_p;
1459 /* __u8 rand; */
1461 if (unlikely(nb == 0))
1462 goto drop;
1464 seqno_p = cor_pull_skb(skb, 6);
1465 if (unlikely(seqno_p == 0))
1466 goto drop;
1468 seqno = cor_parse_u48(seqno_p);
1470 /* get_random_bytes(&rand, 1);
1472 if (rand < 64)
1473 goto drop; */
1475 cor_kernel_packet(nb, skb, seqno);
1477 if (0) {
1478 drop:
1479 kfree_skb(skb);
1482 if (nb != 0) {
1483 kref_put(&(nb->ref), cor_neighbor_free);
1487 static int cor_rcv(struct sk_buff *skb, struct net_device *dev,
1488 struct packet_type *pt, struct net_device *orig_dev)
1490 __u8 packet_type;
1491 char *packet_type_p;
1493 if (skb->pkt_type == PACKET_OTHERHOST ||
1494 unlikely(skb->pkt_type == PACKET_LOOPBACK))
1495 goto drop;
1497 packet_type_p = cor_pull_skb(skb, 1);
1499 if (unlikely(packet_type_p == 0))
1500 goto drop;
1502 packet_type = *packet_type_p;
1504 if (unlikely(packet_type == PACKET_TYPE_ANNOUNCE)) {
1505 cor_rcv_announce(skb);
1506 return NET_RX_SUCCESS;
1507 } else if (packet_type == PACKET_TYPE_CMSG) {
1508 cor_rcv_cmsg(skb);
1509 return NET_RX_SUCCESS;
1510 } else if (packet_type == PACKET_TYPE_CONNDATA) {
1511 cor_rcv_conndata(skb, 0, 0);
1512 return NET_RX_SUCCESS;
1513 } else if (packet_type == PACKET_TYPE_CONNDATA_LOWBUFDELAYED) {
1514 cor_rcv_conndata(skb, 1, 0);
1515 return NET_RX_SUCCESS;
1516 } else if (packet_type == PACKET_TYPE_CONNDATA_FLUSH) {
1517 cor_rcv_conndata(skb, 0, 1);
1518 return NET_RX_SUCCESS;
1519 } else if (packet_type == PACKET_TYPE_CONNDATA_LOWBUFDELAYED_FLUSH) {
1520 cor_rcv_conndata(skb, 1, 1);
1521 return NET_RX_SUCCESS;
1522 } else {
1523 kfree_skb(skb);
1524 return NET_RX_SUCCESS;
1527 drop:
1528 kfree_skb(skb);
1529 return NET_RX_DROP;
1532 int cor_netdev_notify_func(struct notifier_block *not, unsigned long event,
1533 void *ptr)
1535 struct net_device *dev = netdev_notifier_info_to_dev(ptr);
1536 int rc;
1538 switch(event){
1539 case NETDEV_UP:
1540 if (dev->flags & IFF_LOOPBACK)
1541 break;
1543 BUG_ON(dev == 0);
1544 rc = cor_create_queue(dev);
1545 if (rc == 1)
1546 return 1;
1547 if (cor_is_clientmode() == 0)
1548 cor_announce_send_start(dev, dev->broadcast,
1549 ANNOUNCE_TYPE_BROADCAST);
1550 break;
1551 case NETDEV_DOWN:
1552 printk(KERN_ERR "down 1");
1553 udelay(100);
1554 BUG_ON(dev == 0);
1555 printk(KERN_ERR "down 2");
1556 udelay(100);
1557 cor_announce_send_stop(dev, 0, ANNOUNCE_TYPE_BROADCAST);
1558 printk(KERN_ERR "down 3");
1559 udelay(100);
1560 cor_reset_neighbors(dev);
1561 printk(KERN_ERR "down 4");
1562 udelay(100);
1563 cor_destroy_queue(dev);
1564 printk(KERN_ERR "down 5");
1565 udelay(100);
1566 break;
1567 case NETDEV_REBOOT:
1568 case NETDEV_CHANGE:
1569 case NETDEV_REGISTER:
1570 case NETDEV_UNREGISTER:
1571 case NETDEV_CHANGEMTU:
1572 case NETDEV_CHANGEADDR:
1573 case NETDEV_GOING_DOWN:
1574 case NETDEV_CHANGENAME:
1575 case NETDEV_FEAT_CHANGE:
1576 case NETDEV_BONDING_FAILOVER:
1577 break;
1578 default:
1579 return 1;
1582 return 0;
1585 static struct packet_type cor_ptype = {
1586 .type = htons(ETH_P_COR),
1587 .dev = 0,
1588 .func = cor_rcv
1591 void cor_dev_down(void)
1593 if (cor_pack_registered != 0) {
1594 cor_pack_registered = 0;
1595 dev_remove_pack(&cor_ptype);
1598 if (cor_netdev_notify_registered != 0) {
1599 if (unregister_netdevice_notifier(&cor_netdev_notify) != 0) {
1600 printk(KERN_WARNING "warning: cor_dev_down: "
1601 "unregister_netdevice_notifier failed");
1602 BUG();
1604 cor_netdev_notify_registered = 0;
1608 int cor_dev_up(void)
1610 BUG_ON(cor_netdev_notify_registered != 0);
1611 if (register_netdevice_notifier(&cor_netdev_notify) != 0)
1612 return 1;
1613 cor_netdev_notify_registered = 1;
1615 BUG_ON(cor_pack_registered != 0);
1616 dev_add_pack(&cor_ptype);
1617 cor_pack_registered = 1;
1619 return 0;
1622 int __init cor_dev_init(void)
1624 memset(&cor_netdev_notify, 0, sizeof(cor_netdev_notify));
1625 cor_netdev_notify.notifier_call = cor_netdev_notify_func;
1627 return 0;
1630 void __exit cor_dev_exit1(void)
1632 flush_work(&cor_qos_waitexit_work);
1635 MODULE_LICENSE("GPL");