drain_ooo_queue bugfix
[cor_2_6_31.git] / net / cor / rcv.c
blobac1fa865e149214a3a8d703c3c24452b96c93027
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
25 #include <linux/in.h>
28 #include "cor.h"
30 static atomic_t packets_in_workqueue = ATOMIC_INIT(0);
32 static atomic_t ooo_packets = ATOMIC_INIT(0);
34 static struct workqueue_struct *packet_wq;
36 static struct work_struct outofbufferspace_work;
37 DEFINE_SPINLOCK(oobss_lock);
38 static int outofbufferspace_scheduled;
40 /**
41 * buffering space is divided in 4 areas:
43 * buffer_init:
44 * distributed equally among all conns; shrinks and grows immediately when there
45 * are new connections or connections are reset
47 * buffer_speed:
48 * distributed proportional to speed; shrinks and grows constantly
50 * buffer_ata:
51 * used to make noise to make traffic analysis harder; may only shrink if data
52 * is either send on or "stuck" for a long time
54 * buffer_reserve:
55 * reserve in case where sudden shrinking causes some connections to contain
56 * more old data than allowed by the first 3 buffers. If this area is full, the
57 * out-of-memory conn-resetter is triggered
59 * Each area commits a certain amount of space to each connection. This is the
60 * maximum buffer space a connection is allowed to use. The space of a specific
61 * connection is first accounted to buffer_ata. If the buffer space allowed to
62 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
63 * The reserve area will be used last. This should only be the case, if the
64 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
65 * also used up, connections will be reset.
68 DEFINE_MUTEX(buffer_conn_list_lock);
69 LIST_HEAD(buffer_conn_list);
71 /**
72 * used to buffer inserts when main list is locked, moved to main list, after
73 * processing of main list finishes
75 LIST_HEAD(buffer_conn_tmp_list); /* protected by bufferlimits_lock */
78 DEFINE_MUTEX(bufferlimits_lock);
80 static __u64 bufferassigned_init;
81 static __u64 bufferassigned_speed;
82 static __u64 bufferassigned_ata;
84 static __u64 bufferusage_init;
85 static __u64 bufferusage_speed;
86 static __u64 bufferusage_ata;
87 static __u64 bufferusage_reserve;
89 DEFINE_SPINLOCK(st_lock);
90 static struct speedtracker st;
92 static __u64 desired_bufferusage(__u64 assigned, __u64 usage, __u64 assignlimit,
93 __u64 usagelimit)
95 __u64 ret;
96 __u64 load;
98 if (unlikely(assignlimit < usagelimit))
99 assignlimit = usagelimit;
101 if (multiply_div(usage, 9, 10) > usagelimit)
102 return multiply_div(usagelimit, 9, 10);
104 load = multiply_div(usage, 192, usagelimit);
106 /* slow limit increase, fast decrease */
107 if (load == 128) {
108 ret = assigned;
109 } else if (load < 128) {
110 if (load == 0)
111 return multiply_div(usagelimit, 9, 10);
113 if (load < 96)
114 load = 96;
115 ret = multiply_div(assigned, 128, load);
116 } else {
117 ret = multiply_div(assigned, 128, load + load - 128);
120 if (ret > assignlimit)
121 return assignlimit;
122 return ret;
125 /* return 65536 == 1byte/HZ */
126 static __u64 get_speed(struct speedtracker *st, unsigned long jiffies_tmp)
128 if (unlikely(time_after(st->jiffies_last_update, jiffies_tmp) ||
129 time_before(st->jiffies_last_update + HZ*10,
130 jiffies_tmp))) {
131 st->jiffies_last_update = jiffies_tmp;
132 st->speed = 0;
133 st->bytes_curr = 0;
134 return 0;
137 for (;time_before(st->jiffies_last_update, jiffies_tmp);
138 st->jiffies_last_update++) {
139 st->speed = ( st->speed * (HZ*9-1) +
140 (((__u64)st->bytes_curr)<<16) ) /
141 (HZ*10);
142 st->bytes_curr = 0;
145 return ( st->speed * (HZ*9 - 1) + (((__u64)st->bytes_curr) << 17) ) /
146 (HZ*10);
150 * val1[0], val2[2], res[0] ... least significant
151 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
153 static void mul(__u32 *val1, unsigned int val1len, __u32 *val2,
154 unsigned int val2len, __u32 *res, int reslen)
156 int digits = val1len + val2len;
157 __u64 overflow = 0;
158 int i;
160 BUG_ON(val1len > 0 && val2len > 0 && reslen < digits);
162 memset(res, 0, reslen);
164 if (val1len == 0 || val2len == 0)
165 return;
167 for(i=0;i<digits;i++) {
168 int idx1;
169 res[i] = (__u32) overflow;
170 overflow = overflow >> 32;
171 for(idx1=0;idx1<val1len && idx1<=i;idx1++) {
172 int idx2 = i - idx1;
173 __u64 tmpres;
175 if (idx2 >= val2len)
176 continue;
178 tmpres = ((__u64) (val1[idx1])) *
179 ((__u64) (val2[idx2]));
180 overflow += tmpres >> 32;
181 tmpres = (tmpres << 32) >> 32;
182 if (res[i] + tmpres < res[i])
183 overflow++;
184 res[i] += tmpres;
187 BUG_ON(overflow != 0);
191 * return values:
192 * 1 == usage1/speed1 offends more
193 * 0 == both offend the same
194 * -1 == usage2/speed2 offends more
196 static int compare_scores(__u32 usage1, __u64 speed1, __u32 usage2,
197 __u64 speed2)
199 int i;
201 __u32 speed1squared[4];
202 __u32 speed2squared[4];
204 __u32 speed1squared_usage2[5];
205 __u32 speed2squared_usage1[5];
208 __u32 speed1_tmp[2];
209 __u32 speed2_tmp[2];
211 speed1_tmp[0] = (speed1 << 32) >> 32;
212 speed1_tmp[1] = (speed1 >> 32);
213 speed2_tmp[0] = (speed2 << 32) >> 32;
214 speed2_tmp[1] = (speed2 << 32);
216 mul(speed1_tmp, 2, speed1_tmp, 2, speed1squared,4);
217 mul(speed2_tmp, 2, speed2_tmp, 2, speed2squared, 4);
219 mul(speed1squared, 4, &usage2, 1, speed1squared_usage2, 5);
220 mul(speed2squared, 4, &usage2, 1, speed2squared_usage1, 5);
222 for(i=4;i>=0;i++) {
223 if (speed1squared_usage2[i] > speed2squared_usage1[i])
224 return -1;
225 if (speed1squared_usage2[i] < speed2squared_usage1[i])
226 return 1;
229 return 0;
232 #define OOBS_SIZE 10
233 static void _outofbufferspace(void)
235 int i;
237 struct list_head *curr;
238 struct conn *offendingconns[OOBS_SIZE];
239 __u32 offendingusage[OOBS_SIZE];
240 __u64 offendingspeed[OOBS_SIZE];
242 memset(&offendingconns, 0, sizeof(offendingconns));
244 mutex_lock(&buffer_conn_list_lock);
246 curr = buffer_conn_list.next;
247 while (curr != &buffer_conn_list) {
248 unsigned long iflags;
250 struct conn *src_in_o = container_of(curr, struct conn,
251 source.in.buffer_list);
253 __u32 usage;
254 __u64 speed;
256 int i;
258 curr = curr->next;
260 mutex_lock(&(src_in_o->rcv_lock));
262 BUG_ON(src_in_o->sourcetype != SOURCE_IN);
264 usage = src_in_o->source.in.usage_reserve;
266 spin_lock_irqsave(&st_lock, iflags);
267 speed = get_speed(&(src_in_o->source.in.st), jiffies);
268 spin_unlock_irqrestore(&st_lock, iflags);
270 mutex_unlock(&(src_in_o->rcv_lock));
272 if (offendingconns[OOBS_SIZE-1] != 0 &&
273 compare_scores(
274 offendingusage[OOBS_SIZE-1],
275 offendingspeed[OOBS_SIZE-1],
276 usage, speed) >= 0)
277 continue;
279 offendingconns[OOBS_SIZE-1] = src_in_o;
280 offendingusage[OOBS_SIZE-1] = usage;
281 offendingspeed[OOBS_SIZE-1] = speed;
283 for (i=OOBS_SIZE-2;i>=0;i++) {
284 struct conn *tmpconn;
285 __u32 usage_tmp;
286 __u64 speed_tmp;
288 if (offendingconns[i] != 0 && compare_scores(
289 offendingusage[i], offendingspeed[i],
290 offendingusage[i+1],
291 offendingspeed[i+1]) >= 0)
292 break;
294 tmpconn = offendingconns[i];
295 usage_tmp = offendingusage[i];
296 speed_tmp = offendingspeed[i];
298 offendingconns[i] = offendingconns[i+1];
299 offendingusage[i] = offendingusage[i+1];
300 offendingspeed[i] = offendingspeed[i+1];
302 offendingconns[i+1] = tmpconn;
303 offendingusage[i+1] = usage_tmp;
304 offendingspeed[i+1] = speed_tmp;
308 for (i=0;i<OOBS_SIZE;i++) {
309 if (offendingconns[i] == 0)
310 break;
311 kref_get(&(offendingconns[i]->ref));
314 mutex_unlock(&buffer_conn_list_lock);
316 for (i=0;i<OOBS_SIZE;i++) {
317 int resetneeded;
319 if (offendingconns[i] == 0)
320 break;
322 mutex_lock(&bufferlimits_lock);
323 resetneeded = ((bufferusage_reserve*4)/3 > BUFFERSPACE_RESERVE);
324 mutex_unlock(&bufferlimits_lock);
326 if (resetneeded)
327 reset_conn(offendingconns[i]);
328 kref_put(&(offendingconns[i]->ref), free_conn);
331 mutex_lock(&bufferlimits_lock);
332 mutex_lock(&buffer_conn_list_lock);
333 while(list_empty(&buffer_conn_tmp_list) == 0) {
334 curr = buffer_conn_tmp_list.next;
335 list_del(curr);
336 list_add(curr, &buffer_conn_list);
338 mutex_unlock(&buffer_conn_list_lock);
339 mutex_unlock(&bufferlimits_lock);
342 static void outofbufferspace(struct work_struct *work)
344 while (1) {
345 unsigned long iflags;
346 int resetneeded;
348 mutex_lock(&bufferlimits_lock);
349 spin_lock_irqsave(&oobss_lock, iflags);
350 resetneeded = (bufferusage_reserve > BUFFERSPACE_RESERVE);
352 if (resetneeded == 0)
353 outofbufferspace_scheduled = 0;
355 spin_unlock_irqrestore(&oobss_lock, iflags);
356 mutex_unlock(&bufferlimits_lock);
358 if (resetneeded == 0)
359 return;
361 _outofbufferspace();
365 static void refresh_bufferusage(struct conn *src_in_l)
367 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
369 bufferusage_init -= src_in_l->source.in.usage_init;
370 bufferusage_speed -= src_in_l->source.in.usage_speed;
371 bufferusage_ata -= src_in_l->source.in.usage_ata;
372 bufferusage_reserve -= src_in_l->source.in.usage_reserve;
374 src_in_l->source.in.usage_ata = src_in_l->data_buf.totalsize;
375 if (src_in_l->source.in.usage_ata > src_in_l->source.in.buffer_ata)
376 src_in_l->source.in.usage_ata = src_in_l->source.in.buffer_ata;
379 if (src_in_l->source.in.usage_ata == src_in_l->data_buf.totalsize)
380 src_in_l->source.in.usage_speed = 0;
381 else
382 src_in_l->source.in.usage_speed = src_in_l->data_buf.totalsize -
383 src_in_l->source.in.usage_ata;
385 if (src_in_l->source.in.usage_speed > src_in_l->source.in.buffer_speed)
386 src_in_l->source.in.usage_speed =
387 src_in_l->source.in.buffer_speed;
390 if ((src_in_l->source.in.usage_ata + src_in_l->source.in.usage_speed) ==
391 src_in_l->data_buf.totalsize)
392 src_in_l->source.in.usage_init = 0;
393 else
394 src_in_l->source.in.usage_init = src_in_l->data_buf.totalsize -
395 src_in_l->source.in.usage_ata -
396 src_in_l->source.in.usage_speed;
398 if (src_in_l->source.in.usage_init > src_in_l->source.in.buffer_init)
399 src_in_l->source.in.usage_init =
400 src_in_l->source.in.buffer_init;
403 if ((src_in_l->source.in.usage_ata + src_in_l->source.in.usage_speed +
404 src_in_l->source.in.usage_init) ==
405 src_in_l->data_buf.totalsize)
406 src_in_l->source.in.usage_reserve = 0;
407 else
408 src_in_l->source.in.usage_reserve =
409 src_in_l->data_buf.totalsize -
410 src_in_l->source.in.usage_ata -
411 src_in_l->source.in.usage_speed -
412 src_in_l->source.in.usage_init;
414 bufferusage_init += src_in_l->source.in.usage_init;
415 bufferusage_speed += src_in_l->source.in.usage_speed;
416 bufferusage_ata += src_in_l->source.in.usage_ata;
417 bufferusage_reserve += src_in_l->source.in.usage_reserve;
419 if (bufferusage_reserve > BUFFERSPACE_RESERVE) {
420 unsigned long iflags;
421 spin_lock_irqsave(&oobss_lock, iflags);
422 if (outofbufferspace_scheduled == 0) {
423 schedule_work(&outofbufferspace_work);
424 outofbufferspace_scheduled = 1;
427 spin_unlock_irqrestore(&oobss_lock, iflags);
431 static __u8 __get_window(struct conn *src_in_l)
433 __u64 window = 0;
435 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
437 if (src_in_l->source.in.usage_reserve != 0)
438 return 0;
440 BUG_ON(src_in_l->source.in.usage_init >
441 src_in_l->source.in.buffer_init);
442 BUG_ON(src_in_l->source.in.usage_speed >
443 src_in_l->source.in.buffer_speed);
444 BUG_ON(src_in_l->source.in.usage_ata > src_in_l->source.in.buffer_ata);
446 window += src_in_l->source.in.buffer_init;
447 window += src_in_l->source.in.buffer_speed;
448 window += src_in_l->source.in.buffer_ata;
450 window -= src_in_l->source.in.usage_init;
451 window -= src_in_l->source.in.usage_speed;
452 window -= src_in_l->source.in.usage_ata;
454 if (window > MAX_ANNOUNCE_WINDOW)
455 window = MAX_ANNOUNCE_WINDOW;
457 return enc_log_64_11(window);
460 #warning todo upper buffer limits
461 static __u8 _get_window(struct conn *cn, struct neighbor *expectedsender,
462 __u32 expected_connid, int from_acksend, int listlocked)
464 unsigned long iflags;
466 unsigned long jiffies_tmp;
468 __u8 window = 0;
470 __s32 conns;
471 __u64 bufferlimit_init;
472 __u64 connlimit_init;
474 __u64 totalspeed;
475 __u64 bufferlimit_speed;
476 __u64 connlimit_speed;
478 mutex_lock(&(cn->rcv_lock));
480 BUG_ON(expectedsender == 0 && cn->sourcetype != SOURCE_IN);
482 if (unlikely(unlikely(cn->sourcetype != SOURCE_IN) ||
483 unlikely(expectedsender != 0 && (cn->source.in.nb !=
484 expectedsender || cn->reversedir->target.out.conn_id !=
485 expected_connid))))
486 goto out;
488 if (unlikely(cn->isreset != 0)) {
489 if (listlocked && (cn->source.in.buffer_list.next != 0 ||
490 cn->source.in.buffer_list.prev != 0)) {
491 list_del(&(cn->source.in.buffer_list));
492 cn->source.in.buffer_list.next = 0;
493 cn->source.in.buffer_list.prev = 0;
494 kref_put(&(cn->ref), free_conn);
496 goto out;
499 if (listlocked){
500 if (cn->source.in.buffer_list.next != 0 ||
501 cn->source.in.buffer_list.prev != 0) {
502 list_del(&(cn->source.in.buffer_list));
503 } else {
504 kref_get(&(cn->ref));
506 list_add_tail(&(cn->source.in.buffer_list),
507 &buffer_conn_list);
508 } else if (cn->source.in.buffer_list.next == 0 &&
509 cn->source.in.buffer_list.prev == 0) {
510 kref_get(&(cn->ref));
511 list_add_tail(&(cn->source.in.buffer_list),
512 &buffer_conn_tmp_list);
516 conns = atomic_read(&num_conns);
517 BUG_ON(conns < 0);
518 bufferlimit_init = desired_bufferusage(bufferassigned_init,
519 bufferusage_init, BUFFERASSIGN_INIT, BUFFERSPACE_INIT);
520 connlimit_init = (bufferlimit_init + conns - 1) / conns;
522 bufferassigned_init -= cn->source.in.buffer_init;
523 if (((__u32) connlimit_init) != connlimit_init)
524 cn->source.in.buffer_init = -1;
525 else
526 cn->source.in.buffer_init = (__u32) connlimit_init;
527 bufferassigned_init += cn->source.in.buffer_init;
529 spin_lock_irqsave(&st_lock, iflags);
530 jiffies_tmp = jiffies;
531 totalspeed = get_speed(&st, jiffies_tmp);
532 bufferlimit_speed = desired_bufferusage(bufferassigned_speed,
533 bufferusage_speed, BUFFERASSIGN_SPEED,
534 BUFFERSPACE_SPEED);
535 connlimit_speed = multiply_div(bufferlimit_speed,
536 get_speed(&(cn->source.in.st), jiffies_tmp),
537 totalspeed);
538 spin_unlock_irqrestore(&st_lock, iflags);
540 bufferassigned_speed -= cn->source.in.buffer_speed;
541 if (((__u32) connlimit_speed) != connlimit_speed)
542 cn->source.in.buffer_speed = -1;
543 else
544 cn->source.in.buffer_speed = (__u32) connlimit_speed;
545 bufferassigned_speed += cn->source.in.buffer_speed;
547 refresh_bufferusage(cn);
549 window = __get_window(cn);
551 cn->source.in.window_seqnolimit = cn->source.in.next_seqno +
552 dec_log_64_11(window);
554 if (from_acksend)
555 cn->source.in.window_seqnolimit_remote =
556 cn->source.in.window_seqnolimit;
557 else
558 send_ack_conn_ifneeded(cn);
560 out:
561 mutex_unlock(&(cn->rcv_lock));
563 return window;
566 __u8 get_window(struct conn *cn, struct neighbor *expectedsender,
567 __u32 expected_connid, int from_acksend)
569 struct conn *cn2;
570 int listlocked;
572 __u8 window;
574 mutex_lock(&bufferlimits_lock);
575 listlocked = mutex_trylock(&buffer_conn_list_lock);
577 window = _get_window(cn, expectedsender, expected_connid, from_acksend,
578 listlocked);
580 if (listlocked) {
582 * refresh window of idle conns as well to keep global counters
583 * accurate
586 cn2 = container_of(buffer_conn_list.next, struct conn,
587 source.in.buffer_list);
589 if (list_empty(&buffer_conn_list) == 0 && cn2 != cn)
590 _get_window(cn2, 0, 0, 0, listlocked);
593 if (list_empty(&buffer_conn_tmp_list) == 0) {
594 cn2 = container_of(buffer_conn_tmp_list.next,
595 struct conn, source.in.buffer_list);
596 BUG_ON(cn2 == cn);
597 _get_window(cn2, 0, 0, 0, listlocked);
600 mutex_unlock(&buffer_conn_list_lock);
603 mutex_unlock(&bufferlimits_lock);
605 return window;
608 void reset_bufferusage(struct conn *cn)
610 int listlocked;
612 mutex_lock(&bufferlimits_lock);
613 listlocked = mutex_trylock(&buffer_conn_list_lock);
614 mutex_lock(&(cn->rcv_lock));
616 if (cn->sourcetype != SOURCE_IN)
617 goto out;
619 bufferusage_init -= cn->source.in.usage_init;
620 bufferusage_speed -= cn->source.in.usage_speed;
621 bufferusage_ata -= cn->source.in.usage_ata;
622 bufferusage_reserve -= cn->source.in.usage_reserve;
624 bufferassigned_init -= cn->source.in.buffer_init;
625 bufferassigned_speed -= cn->source.in.buffer_speed;
626 bufferassigned_ata -= cn->source.in.buffer_ata;
628 if (listlocked && (cn->source.in.buffer_list.next != 0 ||
629 cn->source.in.buffer_list.prev != 0)) {
630 list_del(&(cn->source.in.buffer_list));
631 cn->source.in.buffer_list.next = 0;
632 cn->source.in.buffer_list.prev = 0;
633 kref_put(&(cn->ref), free_conn);
636 out:
637 mutex_unlock(&(cn->rcv_lock));
638 if (listlocked)
639 mutex_unlock(&buffer_conn_list_lock);
640 mutex_unlock(&bufferlimits_lock);
643 void refresh_speedstat(struct conn *src_in_l, __u32 written)
645 unsigned long iflags;
646 unsigned long jiffies_tmp;
648 spin_lock_irqsave(&st_lock, iflags);
650 jiffies_tmp = jiffies;
652 if (src_in_l->source.in.st.jiffies_last_update != jiffies_tmp)
653 get_speed(&(src_in_l->source.in.st), jiffies_tmp);
654 if (src_in_l->source.in.st.bytes_curr + written < written)
655 src_in_l->source.in.st.bytes_curr = -1;
656 else
657 src_in_l->source.in.st.bytes_curr += written;
659 if (st.jiffies_last_update != jiffies_tmp)
660 get_speed(&st, jiffies_tmp);
661 if (st.bytes_curr + written < written)
662 st.bytes_curr = -1;
663 else
664 st.bytes_curr += written;
666 spin_unlock_irqrestore(&st_lock, iflags);
669 void reset_ooo_queue(struct conn *src_in_l)
671 struct sk_buff *skb;
673 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
675 skb = src_in_l->source.in.reorder_queue.next;
677 while ((void *) skb != (void *) &(src_in_l->source.in.reorder_queue)) {
678 struct skb_procstate *ps = skb_pstate(skb);
679 int drop;
681 if (src_in_l->source.in.next_seqno != ps->funcstate.rcv2.seqno)
682 break;
684 drop = receive_skb(src_in_l, skb);
685 if (drop)
686 break;
688 skb_unlink(skb, &(src_in_l->source.in.reorder_queue));
689 src_in_l->source.in.ooo_packets--;
690 atomic_dec(&(src_in_l->source.in.nb->ooo_packets));
691 atomic_dec(&ooo_packets);
693 src_in_l->source.in.next_seqno += skb->len;
697 #warning todo overlapping seqno rcv
698 void drain_ooo_queue(struct conn *src_in_l)
700 BUG_ON(src_in_l->sourcetype != SOURCE_IN);
702 while (skb_queue_empty(&(src_in_l->source.in.reorder_queue)) == 0) {
703 struct sk_buff *skb = src_in_l->source.in.reorder_queue.next;
704 skb_unlink(skb, &(src_in_l->source.in.reorder_queue));
705 kfree_skb(skb);
706 src_in_l->source.in.ooo_packets--;
707 atomic_dec(&(src_in_l->source.in.nb->ooo_packets));
708 atomic_dec(&ooo_packets);
712 static int _conn_rcv_ooo(struct conn *src_in_l, struct sk_buff *skb)
714 struct skb_procstate *ps = skb_pstate(skb);
715 struct sk_buff_head *reorder_queue =
716 &(src_in_l->source.in.reorder_queue);
717 struct sk_buff *curr = reorder_queue->next;
719 long ooo;
721 #warning todo limit amount of data, not packet count
722 src_in_l->source.in.ooo_packets++;
723 if (src_in_l->source.in.ooo_packets > MAX_TOTAL_OOO_PER_CONN)
724 goto drop_ooo3;
726 ooo = atomic_inc_return(&(src_in_l->source.in.nb->ooo_packets));
727 if (ooo > MAX_TOTAL_OOO_PER_NEIGH)
728 goto drop_ooo2;
730 ooo = atomic_inc_return(&ooo_packets);
731 if (ooo > MAX_TOTAL_OOO_PACKETS)
732 goto drop_ooo1;
735 while (1) {
736 struct skb_procstate *ps2 = skb_pstate(curr);
738 if ((void *) curr == (void *) reorder_queue) {
739 skb_queue_tail(reorder_queue, skb);
740 break;
743 if (ps->funcstate.rcv2.seqno > ps2->funcstate.rcv2.seqno) {
744 skb_insert(curr, skb, reorder_queue);
745 break;
748 curr = curr->next;
751 if (0) {
752 drop_ooo1:
753 atomic_dec(&ooo_packets);
754 drop_ooo2:
755 atomic_dec(&(src_in_l->source.in.nb->ooo_packets));
756 drop_ooo3:
757 src_in_l->source.in.ooo_packets--;
759 return 1;
762 return 0;
765 static void _conn_rcv(struct neighbor *nb, struct conn *src_in,
766 struct sk_buff *skb, __u32 conn_id)
768 struct skb_procstate *ps = skb_pstate(skb);
770 int in_order;
771 int drop = 1;
773 __u32 len = skb->len;
775 mutex_lock(&(src_in->rcv_lock));
777 if (unlikely(unlikely(src_in->isreset != 0) ||
778 unlikely(src_in->sourcetype != SOURCE_IN) ||
779 unlikely(src_in->source.in.conn_id != conn_id)))
780 goto out;
782 if (nb == 0) {
783 if (unlikely(is_from_nb(skb, src_in->source.in.nb) == 0))
784 goto out;
785 } else {
786 if (unlikely(src_in->source.in.nb != nb))
787 goto out;
790 set_last_act(src_in);
792 if (((__s32) (ps->funcstate.rcv2.seqno + skb->len -
793 src_in->source.in.window_seqnolimit)) > 0)
794 goto out;
796 in_order = (src_in->source.in.next_seqno == ps->funcstate.rcv2.seqno);
797 if (in_order == 0) {
798 drop = _conn_rcv_ooo(src_in, skb);
799 } else {
800 src_in->source.in.next_seqno += skb->len;
801 drop = receive_skb(src_in, skb);
804 if (drop)
805 goto out;
807 if (in_order == 0) {
808 struct control_msg_out *cm =
809 alloc_control_msg(src_in->source.in.nb,
810 ACM_PRIORITY_LOW);
811 send_ack_conn_ooo(cm, src_in,
812 src_in->reversedir->target.out.conn_id,
813 ps->funcstate.rcv2.seqno, len);
814 } else {
815 drain_ooo_queue(src_in);
816 src_in->source.in.inorder_ack_needed = 1;
819 out:
820 send_ack_conn_ifneeded(src_in);
822 mutex_unlock(&(src_in->rcv_lock));
824 if (unlikely(drop)) {
825 kfree_skb(skb);
826 } else if (in_order == 1) {
827 flush_buf(src_in);
831 static void conn_rcv(struct neighbor *nb, struct sk_buff *skb, __u32 conn_id,
832 __u32 seqno)
834 struct conn *cn_src_in;
835 struct skb_procstate *ps = skb_pstate(skb);
837 memset(ps, 0, sizeof(struct skb_procstate));
839 ps->funcstate.rcv2.seqno = seqno;
841 cn_src_in = get_conn(conn_id);
843 if (unlikely(cn_src_in == 0)) {
844 int nbput = 0;
845 printk(KERN_DEBUG "unknown conn_id when receiving: %d",
846 conn_id);
848 if (nb == 0) {
849 nb = get_neigh_by_mac(skb);
850 nbput = 1;
853 kfree_skb(skb);
854 if (likely(nb != 0)) {
855 send_connid_unknown(nb, conn_id);
856 if (nbput) {
857 kref_put(&(nb->ref), neighbor_free);
860 return;
862 _conn_rcv(nb, cn_src_in, skb, conn_id);
863 kref_put(&(cn_src_in->ref), free_conn);
866 void conn_rcv_buildskb(struct neighbor *nb, char *data, __u32 datalen,
867 __u32 conn_id, __u32 seqno)
869 struct sk_buff *skb = alloc_skb(datalen, GFP_KERNEL);
870 char *dst = skb_put(skb, datalen);
871 memcpy(dst, data, datalen);
872 conn_rcv(nb, skb, conn_id, seqno);
875 static void rcv_data(struct sk_buff *skb)
877 __u32 conn_id;
878 __u32 seqno;
880 char *connid_p = cor_pull_skb(skb, 4);
881 char *seqno_p = cor_pull_skb(skb, 4);
883 /* __u8 rand; */
885 ((char *)&conn_id)[0] = connid_p[0];
886 ((char *)&conn_id)[1] = connid_p[1];
887 ((char *)&conn_id)[2] = connid_p[2];
888 ((char *)&conn_id)[3] = connid_p[3];
890 ((char *)&seqno)[0] = seqno_p[0];
891 ((char *)&seqno)[1] = seqno_p[1];
892 ((char *)&seqno)[2] = seqno_p[2];
893 ((char *)&seqno)[3] = seqno_p[3];
895 conn_id = be32_to_cpu(conn_id);
896 seqno = be32_to_cpu(seqno);
898 /* get_random_bytes(&rand, 1);
900 if (rand < 64) {
901 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
902 seqno_p[1], seqno_p[2], seqno_p[3]);
903 goto drop;
904 } */
906 if (conn_id == 0) {
907 struct neighbor *nb = get_neigh_by_mac(skb);
908 if (unlikely(nb == 0))
909 goto drop;
910 kernel_packet(nb, skb, seqno);
911 kref_put(&(nb->ref), neighbor_free);
912 } else {
913 conn_rcv(0, skb, conn_id, seqno);
916 if (0) {
917 drop:
918 kfree_skb(skb);
922 static void rcv(struct work_struct *work)
924 struct sk_buff *skb = skb_from_pstate(container_of(work,
925 struct skb_procstate, funcstate.rcv.work));
927 __u8 packet_type;
928 char *packet_type_p;
930 atomic_dec(&packets_in_workqueue);
932 packet_type_p = cor_pull_skb(skb, 1);
934 if (unlikely(packet_type_p == 0))
935 goto drop;
937 packet_type = *packet_type_p;
939 if (packet_type == PACKET_TYPE_ANNOUNCE) {
940 rcv_announce(skb);
941 return;
944 if (unlikely(packet_type != PACKET_TYPE_DATA))
945 goto drop;
947 rcv_data(skb);
949 if (0) {
950 drop:
951 kfree_skb(skb);
955 static int queue_rcv_processing(struct sk_buff *skb, struct net_device *dev,
956 struct packet_type *pt, struct net_device *orig_dev)
958 struct skb_procstate *ps = skb_pstate(skb);
959 long queuelen;
961 if (skb->pkt_type == PACKET_OTHERHOST)
962 goto drop;
964 BUG_ON(skb->next != 0);
966 queuelen = atomic_inc_return(&packets_in_workqueue);
968 BUG_ON(queuelen <= 0);
970 #warning todo limit per interface, inbound credits
971 if (queuelen > MAX_PACKETS_IN_RCVQUEUE) {
972 atomic_dec(&packets_in_workqueue);
973 goto drop;
976 INIT_WORK(&(ps->funcstate.rcv.work), rcv);
977 queue_work(packet_wq, &(ps->funcstate.rcv.work));
978 return NET_RX_SUCCESS;
980 drop:
981 kfree_skb(skb);
982 return NET_RX_DROP;
985 static struct packet_type ptype_cor = {
986 .type = htons(ETH_P_COR),
987 .dev = 0,
988 .func = queue_rcv_processing
991 int __init cor_rcv_init(void)
993 bufferassigned_init = 0;
994 bufferassigned_speed = 0;
995 bufferassigned_ata = 0;
997 bufferusage_init = 0;
998 bufferusage_speed = 0;
999 bufferusage_ata = 0;
1000 bufferusage_reserve = 0;
1002 memset(&st, 0, sizeof(struct speedtracker));
1004 BUG_ON(sizeof(struct skb_procstate) > 48);
1005 packet_wq = create_workqueue("cor_packet");
1006 INIT_WORK(&outofbufferspace_work, outofbufferspace);
1007 outofbufferspace_scheduled = 0;
1009 dev_add_pack(&ptype_cor);
1010 return 0;
1013 MODULE_LICENSE("GPL");