2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/module.h>
22 #include <linux/version.h>
23 #include <linux/kernel.h>
24 #include <linux/init.h>
30 static atomic_t packets_in_workqueue
= ATOMIC_INIT(0);
32 static atomic_t ooo_packets
= ATOMIC_INIT(0);
34 static struct workqueue_struct
*packet_wq
;
36 static struct work_struct outofbufferspace_work
;
37 DEFINE_SPINLOCK(oobss_lock
);
38 static int outofbufferspace_scheduled
;
41 * buffering space is divided in 4 areas:
44 * distributed equally among all conns; shrinks and grows immediately when there
45 * are new connections or connections are reset
48 * distributed proportional to speed; shrinks and grows constantly
51 * used to make noise to make traffic analysis harder; may only shrink if data
52 * is either send on or "stuck" for a long time
55 * reserve in case where sudden shrinking causes some connections to contain
56 * more old data than allowed by the first 3 buffers. If this area is full, the
57 * out-of-memory conn-resetter is triggered
59 * Each area commits a certain amount of space to each connection. This is the
60 * maximum buffer space a connection is allowed to use. The space of a specific
61 * connection is first accounted to buffer_ata. If the buffer space allowed to
62 * use is exceeded, the rest is accounted to buffer_speed and then buffer_init.
63 * The reserve area will be used last. This should only be the case, if the
64 * assigned buffer space of the first 3 areas shrink suddenly. If this area is
65 * also used up, connections will be reset.
68 DEFINE_MUTEX(buffer_conn_list_lock
);
69 LIST_HEAD(buffer_conn_list
);
72 * used to buffer inserts when main list is locked, moved to main list, after
73 * processing of main list finishes
75 LIST_HEAD(buffer_conn_tmp_list
); /* protected by bufferlimits_lock */
78 DEFINE_MUTEX(bufferlimits_lock
);
80 static __u64 bufferassigned_init
;
81 static __u64 bufferassigned_speed
;
82 static __u64 bufferassigned_ata
;
84 static __u64 bufferusage_init
;
85 static __u64 bufferusage_speed
;
86 static __u64 bufferusage_ata
;
87 static __u64 bufferusage_reserve
;
89 DEFINE_SPINLOCK(st_lock
);
90 static struct speedtracker st
;
92 static __u64
desired_bufferusage(__u64 assigned
, __u64 usage
, __u64 assignlimit
,
98 if (unlikely(assignlimit
< usagelimit
))
99 assignlimit
= usagelimit
;
101 if (multiply_div(usage
, 9, 10) > usagelimit
)
102 return multiply_div(usagelimit
, 9, 10);
104 load
= multiply_div(usage
, 192, usagelimit
);
106 /* slow limit increase, fast decrease */
109 } else if (load
< 128) {
111 return multiply_div(usagelimit
, 9, 10);
115 ret
= multiply_div(assigned
, 128, load
);
117 ret
= multiply_div(assigned
, 128, load
+ load
- 128);
120 if (ret
> assignlimit
)
125 #warning todo changing speed too fast
126 static __u64
get_speed(struct speedtracker
*st
, unsigned long jiffies_tmp
)
128 if (unlikely(time_after(st
->jiffies_last_update
, jiffies_tmp
) ||
129 time_before(st
->jiffies_last_update
+ HZ
*10,
131 st
->jiffies_last_update
= jiffies_tmp
;
137 for (;time_before(st
->jiffies_last_update
, jiffies_tmp
);
138 st
->jiffies_last_update
++) {
139 __u32 bytes_curr
= 0;
140 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
141 bytes_curr
= st
->bytes_curr
;
144 st
->speed
= (st
->speed
* (HZ
-1) + (((__u64
)bytes_curr
)<<16))/HZ
;
147 if ((st
->jiffies_last_update
+ 1) == jiffies_tmp
) {
148 st
->jiffies_last_update
++;
151 return (st
->speed
* (HZ
*2 - 1) + (((__u64
)st
->bytes_curr
) << 17))/HZ
/2;
155 * val1[0], val2[2], res[0] ... least significant
156 * val1[val1len-1], val2[val2len-1], res[reslen-1] ... most significant
158 static void mul(__u32
*val1
, unsigned int val1len
, __u32
*val2
,
159 unsigned int val2len
, __u32
*res
, int reslen
)
161 int digits
= val1len
+ val2len
;
165 BUG_ON(val1len
> 0 && val2len
> 0 && reslen
< digits
);
167 memset(res
, 0, reslen
);
169 if (val1len
== 0 || val2len
== 0)
172 for(i
=0;i
<digits
;i
++) {
174 res
[i
] = (__u32
) overflow
;
175 overflow
= overflow
>> 32;
176 for(idx1
=0;idx1
<val1len
&& idx1
<=i
;idx1
++) {
183 tmpres
= ((__u64
) (val1
[idx1
])) *
184 ((__u64
) (val2
[idx2
]));
185 overflow
+= tmpres
>> 32;
186 tmpres
= (tmpres
<< 32) >> 32;
187 if (res
[i
] + tmpres
< res
[i
])
192 BUG_ON(overflow
!= 0);
197 * 1 == usage1/speed1 offends more
198 * 0 == both offend the same
199 * -1 == usage2/speed2 offends more
201 static int compare_scores(__u32 usage1
, __u64 speed1
, __u32 usage2
,
206 __u32 speed1squared
[4];
207 __u32 speed2squared
[4];
209 __u32 speed1squared_usage2
[5];
210 __u32 speed2squared_usage1
[5];
216 speed1_tmp
[0] = (speed1
<< 32) >> 32;
217 speed1_tmp
[1] = (speed1
>> 32);
218 speed2_tmp
[0] = (speed2
<< 32) >> 32;
219 speed2_tmp
[1] = (speed2
<< 32);
221 mul(speed1_tmp
, 2, speed1_tmp
, 2, speed1squared
,4);
222 mul(speed2_tmp
, 2, speed2_tmp
, 2, speed2squared
, 4);
224 mul(speed1squared
, 4, &usage2
, 1, speed1squared_usage2
, 5);
225 mul(speed2squared
, 4, &usage2
, 1, speed2squared_usage1
, 5);
228 if (speed1squared_usage2
[i
] > speed2squared_usage1
[i
])
230 if (speed1squared_usage2
[i
] < speed2squared_usage1
[i
])
238 static void _outofbufferspace(void)
242 struct list_head
*curr
;
243 struct conn
*offendingconns
[OOBS_SIZE
];
244 __u32 offendingusage
[OOBS_SIZE
];
245 __u64 offendingspeed
[OOBS_SIZE
];
247 memset(&offendingconns
, 0, sizeof(offendingconns
));
249 mutex_lock(&buffer_conn_list_lock
);
251 curr
= buffer_conn_list
.next
;
252 while (curr
!= &buffer_conn_list
) {
253 unsigned long iflags
;
255 struct conn
*conn
= container_of(curr
, struct conn
,
256 source
.in
.buffer_list
);
265 mutex_lock(&(conn
->rcv_lock
));
267 BUG_ON(conn
->sourcetype
!= SOURCE_IN
);
269 usage
= conn
->source
.in
.usage_reserve
;
271 spin_lock_irqsave(&st_lock
, iflags
);
272 speed
= get_speed(&(conn
->source
.in
.st
), jiffies
);
273 spin_unlock_irqrestore(&st_lock
, iflags
);
275 mutex_unlock(&(conn
->rcv_lock
));
277 if (offendingconns
[OOBS_SIZE
-1] != 0 &&
279 offendingusage
[OOBS_SIZE
-1],
280 offendingspeed
[OOBS_SIZE
-1],
284 if (offendingconns
[OOBS_SIZE
-1] != 0)
285 kref_put(&(offendingconns
[OOBS_SIZE
-1]->ref
),
288 kref_get(&(conn
->ref
));
289 offendingconns
[OOBS_SIZE
-1] = conn
;
290 offendingusage
[OOBS_SIZE
-1] = usage
;
291 offendingspeed
[OOBS_SIZE
-1] = speed
;
293 for (i
=OOBS_SIZE
-2;i
>=0;i
++) {
294 struct conn
*tmpconn
;
298 if (offendingconns
[i
] != 0 && compare_scores(
299 offendingusage
[i
], offendingspeed
[i
],
301 offendingspeed
[i
+1]) >= 0)
304 tmpconn
= offendingconns
[i
];
305 usage_tmp
= offendingusage
[i
];
306 speed_tmp
= offendingspeed
[i
];
308 offendingconns
[i
] = offendingconns
[i
+1];
309 offendingusage
[i
] = offendingusage
[i
+1];
310 offendingspeed
[i
] = offendingspeed
[i
+1];
312 offendingconns
[i
+1] = tmpconn
;
313 offendingusage
[i
+1] = usage_tmp
;
314 offendingspeed
[i
+1] = speed_tmp
;
318 for (i
=0;i
<OOBS_SIZE
;i
++) {
319 kref_get(&(offendingconns
[i
]->ref
));
322 mutex_unlock(&buffer_conn_list_lock
);
324 for (i
=0;i
<OOBS_SIZE
;i
++) {
327 if (offendingconns
[i
] == 0)
330 mutex_lock(&bufferlimits_lock
);
331 resetneeded
= ((bufferusage_reserve
*4)/3 > BUFFERSPACE_RESERVE
);
332 mutex_unlock(&bufferlimits_lock
);
335 reset_conn(offendingconns
[i
]);
336 kref_put(&(offendingconns
[i
]->ref
), free_conn
);
339 mutex_lock(&bufferlimits_lock
);
340 mutex_lock(&buffer_conn_list_lock
);
341 while(list_empty(&buffer_conn_tmp_list
) == 0) {
342 curr
= buffer_conn_tmp_list
.next
;
344 list_add(curr
, &buffer_conn_list
);
346 mutex_unlock(&buffer_conn_list_lock
);
347 mutex_unlock(&bufferlimits_lock
);
350 static void outofbufferspace(struct work_struct
*work
)
353 unsigned long iflags
;
356 mutex_lock(&bufferlimits_lock
);
357 spin_lock_irqsave(&oobss_lock
, iflags
);
358 resetneeded
= (bufferusage_reserve
> BUFFERSPACE_RESERVE
);
360 if (resetneeded
== 0)
361 outofbufferspace_scheduled
= 0;
363 spin_unlock_irqrestore(&oobss_lock
, iflags
);
364 mutex_unlock(&bufferlimits_lock
);
366 if (resetneeded
== 0)
373 static void refresh_bufferusage(struct conn
*rconn
)
375 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
377 bufferusage_init
-= rconn
->source
.in
.usage_init
;
378 bufferusage_speed
-= rconn
->source
.in
.usage_speed
;
379 bufferusage_ata
-= rconn
->source
.in
.usage_ata
;
380 bufferusage_reserve
-= rconn
->source
.in
.usage_reserve
;
382 rconn
->source
.in
.usage_ata
= rconn
->data_buf
.totalsize
;
383 if (rconn
->source
.in
.usage_ata
> rconn
->source
.in
.buffer_ata
)
384 rconn
->source
.in
.usage_ata
= rconn
->source
.in
.buffer_ata
;
387 if (rconn
->source
.in
.usage_ata
== rconn
->data_buf
.totalsize
)
388 rconn
->source
.in
.usage_speed
= 0;
390 rconn
->source
.in
.usage_speed
= rconn
->data_buf
.totalsize
-
391 rconn
->source
.in
.usage_ata
;
393 if (rconn
->source
.in
.usage_speed
> rconn
->source
.in
.buffer_speed
)
394 rconn
->source
.in
.usage_speed
= rconn
->source
.in
.buffer_speed
;
397 if ((rconn
->source
.in
.usage_ata
+ rconn
->source
.in
.usage_speed
) ==
398 rconn
->data_buf
.totalsize
)
399 rconn
->source
.in
.usage_init
= 0;
401 rconn
->source
.in
.usage_init
= rconn
->data_buf
.totalsize
-
402 rconn
->source
.in
.usage_ata
-
403 rconn
->source
.in
.usage_speed
;
405 if (rconn
->source
.in
.usage_init
> rconn
->source
.in
.buffer_init
)
406 rconn
->source
.in
.usage_init
= rconn
->source
.in
.buffer_init
;
409 if ((rconn
->source
.in
.usage_ata
+ rconn
->source
.in
.usage_speed
+
410 rconn
->source
.in
.usage_init
) ==
411 rconn
->data_buf
.totalsize
)
412 rconn
->source
.in
.usage_reserve
= 0;
414 rconn
->source
.in
.usage_reserve
= rconn
->data_buf
.totalsize
-
415 rconn
->source
.in
.usage_ata
-
416 rconn
->source
.in
.usage_speed
-
417 rconn
->source
.in
.usage_init
;
419 bufferusage_init
+= rconn
->source
.in
.usage_init
;
420 bufferusage_speed
+= rconn
->source
.in
.usage_speed
;
421 bufferusage_ata
+= rconn
->source
.in
.usage_ata
;
422 bufferusage_reserve
+= rconn
->source
.in
.usage_reserve
;
424 if (bufferusage_reserve
> BUFFERSPACE_RESERVE
) {
425 unsigned long iflags
;
426 spin_lock_irqsave(&oobss_lock
, iflags
);
427 if (outofbufferspace_scheduled
== 0) {
428 schedule_work(&outofbufferspace_work
);
429 outofbufferspace_scheduled
= 1;
432 spin_unlock_irqrestore(&oobss_lock
, iflags
);
436 static __u8
__get_window(struct conn
*rconn
)
440 if (rconn
->source
.in
.usage_reserve
!= 0)
443 BUG_ON(rconn
->source
.in
.usage_init
> rconn
->source
.in
.buffer_init
);
444 BUG_ON(rconn
->source
.in
.usage_speed
> rconn
->source
.in
.buffer_speed
);
445 BUG_ON(rconn
->source
.in
.usage_ata
> rconn
->source
.in
.buffer_ata
);
447 window
+= rconn
->source
.in
.buffer_init
;
448 window
+= rconn
->source
.in
.buffer_speed
;
449 window
+= rconn
->source
.in
.buffer_ata
;
451 window
-= rconn
->source
.in
.usage_init
;
452 window
-= rconn
->source
.in
.usage_speed
;
453 window
-= rconn
->source
.in
.usage_ata
;
455 if (window
> MAX_ANNOUNCE_WINDOW
)
456 window
= MAX_ANNOUNCE_WINDOW
;
458 return enc_window(window
);
461 static __u32
_get_window(struct conn
*rconn
, int listlocked
)
463 unsigned long iflags
;
465 unsigned long jiffies_tmp
;
470 __u64 bufferlimit_init
;
471 __u64 connlimit_init
;
474 __u64 bufferlimit_speed
;
475 __u64 connlimit_speed
;
477 mutex_lock(&(rconn
->rcv_lock
));
479 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
481 if (atomic_read(&(rconn
->isreset
)) != 0) {
482 if (listlocked
&& (rconn
->source
.in
.buffer_list
.next
!= 0 ||
483 rconn
->source
.in
.buffer_list
.prev
!= 0)) {
484 list_del(&(rconn
->source
.in
.buffer_list
));
485 rconn
->source
.in
.buffer_list
.next
= 0;
486 rconn
->source
.in
.buffer_list
.prev
= 0;
487 kref_put(&(rconn
->ref
), free_conn
);
493 if (rconn
->source
.in
.buffer_list
.next
!= 0 ||
494 rconn
->source
.in
.buffer_list
.prev
!= 0) {
495 list_del(&(rconn
->source
.in
.buffer_list
));
497 kref_get(&(rconn
->ref
));
499 list_add_tail(&(rconn
->source
.in
.buffer_list
),
501 } else if (rconn
->source
.in
.buffer_list
.next
== 0 &&
502 rconn
->source
.in
.buffer_list
.prev
== 0) {
503 kref_get(&(rconn
->ref
));
504 list_add_tail(&(rconn
->source
.in
.buffer_list
),
505 &buffer_conn_tmp_list
);
509 conns
= atomic_read(&num_conns
);
511 bufferlimit_init
= desired_bufferusage(bufferassigned_init
,
512 bufferusage_init
, BUFFERASSIGN_INIT
, BUFFERSPACE_INIT
);
513 connlimit_init
= (bufferlimit_init
+ conns
- 1) / conns
;
515 bufferassigned_init
-= rconn
->source
.in
.buffer_init
;
516 if (((__u32
) connlimit_init
) != connlimit_init
)
517 rconn
->source
.in
.buffer_init
= -1;
519 rconn
->source
.in
.buffer_init
= (__u32
) connlimit_init
;
520 bufferassigned_init
+= rconn
->source
.in
.buffer_init
;
522 spin_lock_irqsave(&st_lock
, iflags
);
523 jiffies_tmp
= jiffies
;
524 totalspeed
= get_speed(&st
, jiffies_tmp
);
525 bufferlimit_speed
= desired_bufferusage(bufferassigned_speed
,
526 bufferusage_speed
, BUFFERASSIGN_SPEED
,
528 connlimit_speed
= multiply_div(bufferlimit_speed
,
529 get_speed(&(rconn
->source
.in
.st
), jiffies_tmp
),
531 spin_unlock_irqrestore(&st_lock
, iflags
);
533 bufferassigned_speed
-= rconn
->source
.in
.buffer_speed
;
534 if (((__u32
) connlimit_speed
) != connlimit_speed
)
535 rconn
->source
.in
.buffer_speed
= -1;
537 rconn
->source
.in
.buffer_speed
= (__u32
) connlimit_speed
;
538 bufferassigned_speed
+= rconn
->source
.in
.buffer_speed
;
540 refresh_bufferusage(rconn
);
542 window
= __get_window(rconn
);
544 rconn
->source
.in
.window_seqnolimit_last
= rconn
->source
.in
.next_seqno
+
546 if (((__s32
) (rconn
->source
.in
.window_seqnolimit_last
-
547 rconn
->source
.in
.window_seqnolimit_max
)) > 0)
548 rconn
->source
.in
.window_seqnolimit_max
=
549 rconn
->source
.in
.window_seqnolimit_last
;
552 mutex_unlock(&(rconn
->rcv_lock
));
557 /* do not hold rcv_lock while calling this */
558 __u8
get_window(struct conn
*rconn
)
565 mutex_lock(&bufferlimits_lock
);
566 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
568 window
= _get_window(rconn
, listlocked
);
572 * refresh window of idle conns as well to keep global counters
576 rconn2
= container_of(buffer_conn_list
.next
, struct conn
,
577 source
.in
.buffer_list
);
579 if (list_empty(&buffer_conn_list
) == 0 && rconn2
!= rconn
)
580 _get_window(rconn2
, listlocked
);
583 if (list_empty(&buffer_conn_tmp_list
) == 0) {
584 rconn2
= container_of(buffer_conn_tmp_list
.next
,
585 struct conn
, source
.in
.buffer_list
);
586 BUG_ON(rconn2
== rconn
);
587 _get_window(rconn2
, listlocked
);
590 mutex_unlock(&buffer_conn_list_lock
);
593 mutex_unlock(&bufferlimits_lock
);
598 void reset_bufferusage(struct conn
*conn
)
602 mutex_lock(&bufferlimits_lock
);
603 listlocked
= mutex_trylock(&buffer_conn_list_lock
);
604 mutex_lock(&(conn
->rcv_lock
));
606 if (conn
->sourcetype
!= SOURCE_IN
)
609 bufferusage_init
-= conn
->source
.in
.usage_init
;
610 bufferusage_speed
-= conn
->source
.in
.usage_speed
;
611 bufferusage_ata
-= conn
->source
.in
.usage_ata
;
612 bufferusage_reserve
-= conn
->source
.in
.usage_reserve
;
614 bufferassigned_init
-= conn
->source
.in
.buffer_init
;
615 bufferassigned_speed
-= conn
->source
.in
.buffer_speed
;
616 bufferassigned_ata
-= conn
->source
.in
.buffer_ata
;
618 if (listlocked
&& (conn
->source
.in
.buffer_list
.next
!= 0 ||
619 conn
->source
.in
.buffer_list
.prev
!= 0)) {
620 list_del(&(conn
->source
.in
.buffer_list
));
621 conn
->source
.in
.buffer_list
.next
= 0;
622 conn
->source
.in
.buffer_list
.prev
= 0;
623 kref_put(&(conn
->ref
), free_conn
);
627 mutex_unlock(&(conn
->rcv_lock
));
629 mutex_unlock(&buffer_conn_list_lock
);
630 mutex_unlock(&bufferlimits_lock
);
633 void refresh_speedstat(struct conn
*rconn
, __u32 written
)
635 unsigned long iflags
;
636 unsigned long jiffies_tmp
;
638 spin_lock_irqsave(&st_lock
, iflags
);
640 jiffies_tmp
= jiffies
;
642 if (rconn
->source
.in
.st
.jiffies_last_update
!= jiffies_tmp
)
643 get_speed(&(rconn
->source
.in
.st
), jiffies_tmp
);
644 if (rconn
->source
.in
.st
.bytes_curr
+ written
< written
)
645 rconn
->source
.in
.st
.bytes_curr
= -1;
646 rconn
->source
.in
.st
.bytes_curr
+= written
;
648 if (st
.jiffies_last_update
!= jiffies_tmp
)
649 get_speed(&st
, jiffies_tmp
);
650 if (st
.bytes_curr
+ written
< written
)
652 st
.bytes_curr
+= written
;
654 spin_unlock_irqrestore(&st_lock
, iflags
);
657 void drain_ooo_queue(struct conn
*rconn
)
661 BUG_ON(SOURCE_IN
!= rconn
->sourcetype
);
663 skb
= rconn
->source
.in
.reorder_queue
.next
;
665 while ((void *) skb
!= (void *) &(rconn
->source
.in
.reorder_queue
)) {
666 struct skb_procstate
*ps
= skb_pstate(skb
);
669 if (rconn
->source
.in
.next_seqno
!= ps
->funcstate
.rcv2
.seqno
)
672 drop
= receive_skb(rconn
, skb
);
676 skb_unlink(skb
, &(rconn
->source
.in
.reorder_queue
));
677 rconn
->source
.in
.ooo_packets
--;
678 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
679 atomic_dec(&ooo_packets
);
681 rconn
->source
.in
.next_seqno
+= skb
->len
;
685 static int _conn_rcv_ooo(struct conn
*rconn
, struct sk_buff
*skb
)
687 struct skb_procstate
*ps
= skb_pstate(skb
);
688 struct sk_buff_head
*reorder_queue
= &(rconn
->source
.in
.reorder_queue
);
689 struct sk_buff
*curr
= reorder_queue
->next
;
693 #warning todo limit amount of data, not packet count
694 rconn
->source
.in
.ooo_packets
++;
695 if (rconn
->source
.in
.ooo_packets
> MAX_TOTAL_OOO_PER_CONN
)
698 ooo
= atomic_inc_return(&(rconn
->source
.in
.nb
->ooo_packets
));
699 if (ooo
> MAX_TOTAL_OOO_PER_NEIGH
)
702 ooo
= atomic_inc_return(&ooo_packets
);
703 if (ooo
> MAX_TOTAL_OOO_PACKETS
)
708 struct skb_procstate
*ps2
= skb_pstate(curr
);
710 if ((void *) curr
== (void *) reorder_queue
) {
711 skb_queue_tail(reorder_queue
, skb
);
715 if (ps
->funcstate
.rcv2
.seqno
> ps2
->funcstate
.rcv2
.seqno
) {
716 skb_insert(curr
, skb
, reorder_queue
);
725 atomic_dec(&ooo_packets
);
727 atomic_dec(&(rconn
->source
.in
.nb
->ooo_packets
));
729 rconn
->source
.in
.ooo_packets
--;
737 #warning todo check if in window
738 static void _conn_rcv(struct conn
*rconn
, struct sk_buff
*skb
)
740 struct skb_procstate
*ps
= skb_pstate(skb
);
741 struct control_msg_out
*cm
= alloc_control_msg(rconn
->source
.in
.nb
,
748 __u32 len
= skb
->len
;
750 BUG_ON(rconn
->sourcetype
!= SOURCE_IN
);
752 if (unlikely(cm
== 0)) {
757 mutex_lock(&(rconn
->rcv_lock
));
759 in_order
= (rconn
->source
.in
.next_seqno
== ps
->funcstate
.rcv2
.seqno
);
762 drop
= _conn_rcv_ooo(rconn
, skb
);
764 rconn
->source
.in
.next_seqno
+= skb
->len
;
765 drop
= receive_skb(rconn
, skb
);
770 free_control_msg(cm
);
776 send_ack_conn_ooo(cm
, rconn
,
777 rconn
->reversedir
->target
.out
.conn_id
,
778 rconn
->source
.in
.next_seqno
,
779 ps
->funcstate
.rcv2
.seqno
, len
);
781 drain_ooo_queue(rconn
);
782 send_ack_conn(cm
, rconn
, rconn
->reversedir
->target
.out
.conn_id
,
783 rconn
->source
.in
.next_seqno
);
787 mutex_unlock(&(rconn
->rcv_lock
));
793 static void conn_rcv(struct sk_buff
*skb
, __u32 conn_id
, __u32 seqno
)
796 struct skb_procstate
*ps
= skb_pstate(skb
);
798 ps
->funcstate
.rcv2
.seqno
= seqno
;
800 rconn
= get_conn(conn_id
);
802 if (unlikely(rconn
== 0)) {
803 printk(KERN_DEBUG
"unknown conn_id when receiving: %d",
808 _conn_rcv(rconn
, skb
);
809 kref_put(&(rconn
->ref
), free_conn
);
812 void conn_rcv_buildskb(char *data
, __u32 datalen
, __u32 conn_id
, __u32 seqno
)
814 struct sk_buff
*skb
= alloc_skb(datalen
, GFP_KERNEL
);
815 char *dst
= skb_put(skb
, datalen
);
816 memcpy(dst
, data
, datalen
);
817 conn_rcv(skb
, conn_id
, seqno
);
820 static void rcv_data(struct sk_buff
*skb
)
825 char *connid_p
= cor_pull_skb(skb
, 4);
826 char *seqno_p
= cor_pull_skb(skb
, 4);
830 ((char *)&conn_id
)[0] = connid_p
[0];
831 ((char *)&conn_id
)[1] = connid_p
[1];
832 ((char *)&conn_id
)[2] = connid_p
[2];
833 ((char *)&conn_id
)[3] = connid_p
[3];
835 ((char *)&seqno
)[0] = seqno_p
[0];
836 ((char *)&seqno
)[1] = seqno_p
[1];
837 ((char *)&seqno
)[2] = seqno_p
[2];
838 ((char *)&seqno
)[3] = seqno_p
[3];
840 conn_id
= be32_to_cpu(conn_id
);
841 seqno
= be32_to_cpu(seqno
);
843 /* get_random_bytes(&rand, 1);
846 printk(KERN_ERR "drop %d %d %d %d %d", conn_id, seqno_p[0],
847 seqno_p[1], seqno_p[2], seqno_p[3]);
852 struct neighbor
*nb
= get_neigh_by_mac(skb
);
853 if (unlikely(nb
== 0))
855 kernel_packet(nb
, skb
, seqno
);
856 kref_put(&(nb
->ref
), neighbor_free
);
858 conn_rcv(skb
, conn_id
, seqno
);
867 static void rcv(struct work_struct
*work
)
869 struct sk_buff
*skb
= skb_from_pstate(container_of(work
,
870 struct skb_procstate
, funcstate
.rcv
.work
));
875 atomic_dec(&packets_in_workqueue
);
877 packet_type_p
= cor_pull_skb(skb
, 1);
879 if (unlikely(packet_type_p
== 0))
882 packet_type
= *packet_type_p
;
884 if (packet_type
== PACKET_TYPE_ANNOUNCE
) {
889 if (unlikely(packet_type
!= PACKET_TYPE_DATA
))
900 static int queue_rcv_processing(struct sk_buff
*skb
, struct net_device
*dev
,
901 struct packet_type
*pt
, struct net_device
*orig_dev
)
903 struct skb_procstate
*ps
= skb_pstate(skb
);
906 if (skb
->pkt_type
== PACKET_OTHERHOST
)
909 BUG_ON(skb
->next
!= 0);
911 queuelen
= atomic_inc_return(&packets_in_workqueue
);
913 BUG_ON(queuelen
<= 0);
915 #warning todo limit per interface, inbound credits
916 if (queuelen
> MAX_PACKETS_IN_RCVQUEUE
) {
917 atomic_dec(&packets_in_workqueue
);
921 INIT_WORK(&(ps
->funcstate
.rcv
.work
), rcv
);
922 queue_work(packet_wq
, &(ps
->funcstate
.rcv
.work
));
923 return NET_RX_SUCCESS
;
930 static struct packet_type ptype_cor
= {
931 .type
= htons(ETH_P_COR
),
933 .func
= queue_rcv_processing
936 int __init
cor_rcv_init(void)
938 bufferassigned_init
= 0;
939 bufferassigned_speed
= 0;
940 bufferassigned_ata
= 0;
942 bufferusage_init
= 0;
943 bufferusage_speed
= 0;
945 bufferusage_reserve
= 0;
947 memset(&st
, 0, sizeof(struct speedtracker
));
949 BUG_ON(sizeof(struct skb_procstate
) > 48);
950 packet_wq
= create_workqueue("cor_packet");
951 INIT_WORK(&outofbufferspace_work
, outofbufferspace
);
952 outofbufferspace_scheduled
= 0;
954 dev_add_pack(&ptype_cor
);
958 MODULE_LICENSE("GPL");