2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct conn
*cn_init
)
31 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
32 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
35 static void databuf_item_free(struct conn
*cn_l
, struct data_buf_item
*item
)
37 if (item
->type
== TYPE_BUF
) {
38 kfree(item
->data
.buf
.buf
);
39 cn_l
->data_buf
.overhead
-= item
->data
.buf
.buflen
-
40 item
->data
.buf
.datalen
;
41 } else if (item
->type
== TYPE_SKB
) {
42 kfree_skb(item
->data
.skb
);
43 cn_l
->data_buf
.overhead
-= sizeof(struct sk_buff
);
48 cn_l
->data_buf
.overhead
-= sizeof(struct data_buf_item
);
50 list_del(&(item
->buf_list
));
52 kmem_cache_free(data_buf_item_slab
, item
);
55 void databuf_free(struct conn
*cn_l
)
57 while (!list_empty(&(cn_l
->data_buf
.items
))) {
58 struct data_buf_item
*item
= container_of(
59 cn_l
->data_buf
.items
.next
,
60 struct data_buf_item
, buf_list
);
61 if (item
->type
== TYPE_BUF
) {
62 cn_l
->data_buf
.totalsize
-= item
->data
.buf
.datalen
;
63 } else if (item
->type
== TYPE_SKB
) {
64 cn_l
->data_buf
.totalsize
-= item
->data
.skb
->len
;
69 databuf_item_free(cn_l
, item
);
72 BUG_ON(cn_l
->data_buf
.totalsize
!= 0);
73 BUG_ON(cn_l
->data_buf
.overhead
!= 0);
75 if (cn_l
->data_buf
.cpacket_buffer
!= 0) {
76 free_cpacket_buffer(cn_l
->data_buf
.cpacket_buffer
);
77 cn_l
->data_buf
.cpacket_buffer
= 0;
81 void reset_seqno(struct conn
*cn_l
, __u32 initseqno
)
83 cn_l
->data_buf
.first_offset
= initseqno
-
84 cn_l
->data_buf
.last_read_offset
;
87 static void databuf_nextreadchunk(struct conn
*cn_l
)
89 if (cn_l
->data_buf
.lastread
== 0) {
90 BUG_ON(cn_l
->data_buf
.last_read_offset
!= 0);
91 BUG_ON(list_empty(&(cn_l
->data_buf
.items
)));
92 cn_l
->data_buf
.lastread
= container_of(cn_l
->data_buf
.items
.next
,
93 struct data_buf_item
, buf_list
);
94 } else if (&(cn_l
->data_buf
.lastread
->buf_list
) !=
95 cn_l
->data_buf
.items
.prev
) {
96 cn_l
->data_buf
.lastread
= container_of(
97 cn_l
->data_buf
.lastread
->buf_list
.next
,
98 struct data_buf_item
, buf_list
);
100 cn_l
->data_buf
.last_read_offset
= 0;
104 static int _databuf_pull(struct conn
*cn_l
, char *dst
, int len
, int userbuf
)
108 BUG_ON(cn_l
->data_buf
.read_remaining
< len
);
110 if (cn_l
->data_buf
.lastread
== 0)
111 databuf_nextreadchunk(cn_l
);
120 char *srcbufcpystart
= 0;
121 int srcbufcpylen
= 0;
123 BUG_ON(cn_l
->data_buf
.lastread
== 0);
125 if (cn_l
->data_buf
.lastread
->type
== TYPE_BUF
) {
126 srcbuf
= cn_l
->data_buf
.lastread
->data
.buf
.buf
;
127 srcbuflen
= cn_l
->data_buf
.lastread
->data
.buf
.datalen
;
128 } else if (cn_l
->data_buf
.lastread
->type
== TYPE_SKB
) {
129 srcbuf
= cn_l
->data_buf
.lastread
->data
.skb
->data
;
130 srcbuflen
= cn_l
->data_buf
.lastread
->data
.skb
->len
;
135 srcbufcpystart
= srcbuf
+ cn_l
->data_buf
.last_read_offset
;
136 srcbufcpylen
= srcbuflen
- cn_l
->data_buf
.last_read_offset
;
138 if (cpy
> srcbufcpylen
)
142 int notcopied
= copy_to_user(dst
, srcbufcpystart
, cpy
);
144 if (unlikely(notcopied
> 0))
147 memcpy(dst
, srcbufcpystart
, cpy
);
154 cn_l
->data_buf
.read_remaining
-= cpy
;
155 cn_l
->data_buf
.last_read_offset
+= cpy
;
157 if (cpy
== srcbufcpylen
)
158 databuf_nextreadchunk(cn_l
);
160 if (unlikely(rc
< 0)) {
170 void databuf_pull(struct conn
*cn_l
, char *dst
, int len
)
172 _databuf_pull(cn_l
, dst
, len
, 0);
175 size_t databuf_pulluser(struct conn
*trgt_sock_l
, struct msghdr
*msg
)
181 while (iovidx
< msg
->msg_iovlen
) {
184 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
185 __user
char *msg
= iov
->iov_base
+ iovread
;
186 unsigned int len
= iov
->iov_len
- iovread
;
194 if (trgt_sock_l
->data_buf
.read_remaining
== 0) {
195 if (trgt_sock_l
->sourcetype
== SOURCE_NONE
)
200 if (len
> trgt_sock_l
->data_buf
.read_remaining
)
201 len
= trgt_sock_l
->data_buf
.read_remaining
;
203 rc
= _databuf_pull(trgt_sock_l
, msg
, len
, 1);
221 void databuf_unpull(struct conn
*trgt_out_l
, __u32 bytes
)
223 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
225 BUG_ON(trgt_out_l
->data_buf
.lastread
== 0);
227 while (bytes
> trgt_out_l
->data_buf
.last_read_offset
) {
228 bytes
-= trgt_out_l
->data_buf
.last_read_offset
;
229 trgt_out_l
->data_buf
.lastread
= container_of(
230 trgt_out_l
->data_buf
.lastread
->buf_list
.prev
,
231 struct data_buf_item
, buf_list
);
232 BUG_ON(&(trgt_out_l
->data_buf
.lastread
->buf_list
) ==
233 &(trgt_out_l
->data_buf
.items
));
236 trgt_out_l
->data_buf
.last_read_offset
-= bytes
;
239 void databuf_pullold(struct conn
*trgt_out_l
, __u32 startpos
, char *dst
,
242 __u32 pos
= trgt_out_l
->data_buf
.first_offset
;
243 struct data_buf_item
*dbi
= container_of(
244 trgt_out_l
->data_buf
.items
.next
,
245 struct data_buf_item
, buf_list
);
250 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
252 if (trgt_out_l
->data_buf
.lastread
->type
== TYPE_BUF
) {
253 srcbuflen
= dbi
->data
.buf
.datalen
;
254 } else if (trgt_out_l
->data_buf
.lastread
->type
== TYPE_SKB
) {
255 srcbuflen
= dbi
->data
.skb
->len
;
260 if (((__s32
) (pos
+ srcbuflen
- startpos
)) > 0)
264 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
274 char *srcbufcpystart
= 0;
275 int srcbufcpylen
= 0;
277 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
279 if (trgt_out_l
->data_buf
.lastread
->type
== TYPE_BUF
) {
280 srcbuf
= trgt_out_l
->data_buf
.lastread
->data
.buf
.buf
;
281 srcbuflen
= trgt_out_l
->data_buf
.lastread
->data
.buf
.datalen
;
282 } else if (trgt_out_l
->data_buf
.lastread
->type
== TYPE_SKB
) {
283 srcbuf
= trgt_out_l
->data_buf
.lastread
->data
.skb
->data
;
284 srcbuflen
= trgt_out_l
->data_buf
.lastread
->data
.skb
->len
;
289 BUG_ON(((__s32
) (pos
- startpos
)) > 0);
291 srcbufcpystart
= srcbuf
+ ((__s32
) (startpos
- pos
));
292 srcbufcpylen
= srcbuflen
- ((__s32
) (startpos
- pos
));
294 if (cpy
> srcbufcpylen
)
297 memcpy(dst
, srcbufcpystart
, cpy
);
304 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
309 /* ack up to *not* including pos */
310 void databuf_ack(struct conn
*trgt_out_l
, __u32 pos
)
314 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
315 struct data_buf_item
*firstitem
= container_of(
316 trgt_out_l
->data_buf
.items
.next
,
317 struct data_buf_item
, buf_list
);
320 if (firstitem
== trgt_out_l
->data_buf
.lastread
)
323 if (firstitem
->type
== TYPE_BUF
) {
324 firstlen
= firstitem
->data
.buf
.datalen
;
325 } else if (firstitem
->type
== TYPE_SKB
) {
326 firstlen
= firstitem
->data
.skb
->len
;
331 if ( ((__s32
) (trgt_out_l
->data_buf
.first_offset
+ firstlen
-
335 trgt_out_l
->data_buf
.first_offset
+= firstlen
;
338 databuf_item_free(trgt_out_l
, firstitem
);
341 trgt_out_l
->data_buf
.totalsize
-= acked
;
343 BUG_ON(trgt_out_l
->data_buf
.totalsize
== 0 &&
344 trgt_out_l
->data_buf
.overhead
!= 0);
346 if (unlikely(trgt_out_l
->data_buf
.cpacket_buffer
!= 0)) {
347 __u32 amount
= acked
> trgt_out_l
->data_buf
.cpacket_buffer
?
348 acked
: trgt_out_l
->data_buf
.cpacket_buffer
;
349 free_cpacket_buffer(amount
);
350 trgt_out_l
->data_buf
.cpacket_buffer
-= amount
;
353 if (trgt_out_l
->sourcetype
== SOURCE_IN
)
354 refresh_speedstat(trgt_out_l
, acked
);
357 void databuf_ackread(struct conn
*cn_l
)
361 while (!list_empty(&(cn_l
->data_buf
.items
)) &&
362 cn_l
->data_buf
.lastread
!= 0) {
363 struct data_buf_item
*firstitem
= container_of(
364 cn_l
->data_buf
.items
.next
,
365 struct data_buf_item
, buf_list
);
367 if (firstitem
== cn_l
->data_buf
.lastread
)
370 if (firstitem
->type
== TYPE_BUF
) {
371 acked
+= firstitem
->data
.buf
.datalen
;
372 } else if (firstitem
->type
== TYPE_SKB
) {
373 acked
+= firstitem
->data
.skb
->len
;
378 databuf_item_free(cn_l
, firstitem
);
381 cn_l
->data_buf
.first_offset
+= acked
;
382 cn_l
->data_buf
.totalsize
-= acked
;
384 BUG_ON(cn_l
->data_buf
.totalsize
== 0 && cn_l
->data_buf
.overhead
!= 0);
386 if (unlikely(cn_l
->data_buf
.cpacket_buffer
!= 0)) {
387 __u32 amount
= acked
> cn_l
->data_buf
.cpacket_buffer
?
388 acked
: cn_l
->data_buf
.cpacket_buffer
;
389 free_cpacket_buffer(amount
);
390 cn_l
->data_buf
.cpacket_buffer
-= amount
;
393 if (cn_l
->sourcetype
== SOURCE_IN
)
394 refresh_speedstat(cn_l
, acked
);
397 static __s64
_receive_buf(struct conn
*cn_l
, char *buf
, __u32 len
, int userbuf
,
398 __u32 maxcpy
, __u32 maxusage
)
400 struct data_buf_item
*item
= 0;
404 if (list_empty(&(cn_l
->data_buf
.items
)) == 0) {
405 struct list_head
*last
= cn_l
->data_buf
.items
.prev
;
406 item
= container_of(last
, struct data_buf_item
, buf_list
);
413 if (item
== 0 || item
->type
!= TYPE_BUF
||
414 item
->data
.buf
.buflen
<=
415 item
->data
.buf
.datalen
) {
419 if (cn_l
->data_buf
.totalsize
+
420 cn_l
->data_buf
.overhead
>
426 buflen
= maxusage
- cn_l
->data_buf
.totalsize
-
427 cn_l
->data_buf
.overhead
-
428 sizeof(struct data_buf_item
);
430 if (totalcpy
+ 64 > maxcpy
&&
431 totalcpy
+ len
> maxcpy
) {
436 if (totalcpy
+ buflen
< maxcpy
)
437 buflen
= maxcpy
- totalcpy
;
442 if (buflen
> PAGESIZE
)
447 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
448 if (unlikely(item
== 0)) {
452 memset(item
, 0, sizeof(item
));
453 item
->type
= TYPE_BUF
;
454 item
->data
.buf
.buf
= kmalloc(buflen
, GFP_KERNEL
);
456 if (unlikely(item
->data
.buf
.buf
== 0)) {
457 kmem_cache_free(data_buf_item_slab
, item
);
461 item
->data
.buf
.datalen
= 0;
463 list_add_tail(&(item
->buf_list
),
464 &(cn_l
->data_buf
.items
));
465 item
->data
.buf
.buflen
= buflen
;
466 cn_l
->data_buf
.overhead
+= buflen
+
467 sizeof(struct data_buf_item
);
470 BUG_ON(item
->type
!= TYPE_BUF
);
471 BUG_ON(item
->data
.buf
.buflen
<= item
->data
.buf
.datalen
);
473 if (item
->data
.buf
.buflen
- item
->data
.buf
.datalen
< cpy
)
474 cpy
= (item
->data
.buf
.buflen
- item
->data
.buf
.datalen
);
477 int notcopied
= copy_from_user(item
->data
.buf
.buf
+
478 item
->data
.buf
.datalen
, buf
, cpy
);
480 if (unlikely(notcopied
> 0))
483 memcpy(item
->data
.buf
.buf
+ item
->data
.buf
.datalen
,
489 cn_l
->data_buf
.read_remaining
+= cpy
;
490 cn_l
->data_buf
.totalsize
+= cpy
;
491 cn_l
->data_buf
.overhead
-= cpy
;
492 BUG_ON(cn_l
->data_buf
.totalsize
== 0 &&
493 cn_l
->data_buf
.overhead
!= 0);
496 item
->data
.buf
.datalen
+= cpy
;
499 if (unlikely(rc
< 0)) {
509 __s64
receive_userbuf(struct conn
*src_sock_l
, struct msghdr
*msg
, __u32 maxcpy
,
516 while (iovidx
< msg
->msg_iovlen
) {
517 struct iovec
*iov
= msg
->msg_iov
+ iovidx
;
518 __user
char *userbuf
= iov
->iov_base
+ iovread
;
519 __u32 len
= iov
->iov_len
- iovread
;
529 BUG_ON(copied
> maxcpy
);
530 rc
= _receive_buf(src_sock_l
, userbuf
, len
, 1, maxcpy
- copied
,
549 void receive_cpacketresp(struct conn
*trtg_unconn_l
, char *buf
, int len
)
552 BUG_ON(trtg_unconn_l
->data_buf
.cpacket_buffer
<
553 trtg_unconn_l
->data_buf
.totalsize
+len
);
554 rc
= _receive_buf(trtg_unconn_l
, buf
, len
, 0, len
, 0);
559 int receive_skb(struct conn
*src_in_l
, struct sk_buff
*skb
)
561 struct data_buf_item
*item
;
563 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_KERNEL
);
565 if (unlikely(item
== 0))
568 item
->data
.skb
= skb
;
569 item
->type
= TYPE_SKB
;
570 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
571 src_in_l
->data_buf
.read_remaining
+= skb
->len
;
572 src_in_l
->data_buf
.totalsize
+= skb
->len
;
573 src_in_l
->data_buf
.overhead
+= sizeof(struct data_buf_item
) +
574 sizeof(struct sk_buff
);
576 BUG_ON(src_in_l
->data_buf
.totalsize
== 0 &&
577 src_in_l
->data_buf
.overhead
!= 0);
582 static void _wake_sender_in(struct conn
*src_in_l
)
584 int windowlimitreached
= (src_in_l
->source
.in
.next_seqno
==
585 src_in_l
->source
.in
.window_seqnolimit_last
);
586 struct neighbor
*nb
= src_in_l
->source
.in
.nb
;
587 __u32 conn_id
= src_in_l
->reversedir
->target
.out
.conn_id
;
588 __u32 next_seqno
= src_in_l
->source
.in
.next_seqno
;
591 struct control_msg_out
*cm
;
593 drain_ooo_queue(src_in_l
);
594 mutex_unlock(&(src_in_l
->rcv_lock
));
596 if (windowlimitreached
== 0)
599 window
= get_window(src_in_l
, nb
);
603 cm
= alloc_control_msg(nb
, ACM_PRIORITY_HIGH
);
604 if (unlikely(cm
== 0))
605 send_ping_all_conns(nb
);
607 send_ack_conn(cm
, src_in_l
, conn_id
, next_seqno
);
610 void wake_sender(struct conn
*cn
)
612 mutex_lock(&(cn
->rcv_lock
));
613 switch (cn
->sourcetype
) {
615 mutex_unlock(&(cn
->rcv_lock
));
616 parse(cn
->reversedir
, 0);
619 wake_up_interruptible(&(cn
->source
.sock
.wait
));
620 mutex_unlock(&(cn
->rcv_lock
));
623 _wake_sender_in(cn
); /* mutex_unlock inside */
630 void flush_buf(struct conn
*cn
)
632 int rc
= RC_FLUSH_CONN_OUT_OK
;
633 mutex_lock(&(cn
->rcv_lock
));
635 switch (cn
->targettype
) {
636 case TARGET_UNCONNECTED
:
637 mutex_unlock(&(cn
->rcv_lock
));
641 if (cn
->sourcetype
!= SOURCE_SOCK
||
642 cn
->source
.sock
.delay_flush
== 0 ||
643 cn
->data_buf
.totalsize
+
644 cn
->data_buf
.overhead
-
645 cn
->data_buf
.cpacket_buffer
>=
646 BUFFERLIMIT_SOCK_SOCK
/2)
647 wake_up_interruptible(&(cn
->target
.sock
.wait
));
648 mutex_unlock(&(cn
->rcv_lock
));
651 rc
= flush_out(cn
, 0, 0);
652 mutex_unlock(&(cn
->rcv_lock
));
658 refresh_conn_credits(cn
, 0, 0);
659 unreserve_sock_buffer(cn
);
661 if (rc
== RC_FLUSH_CONN_OUT_CONG
) {
662 #warning todo locking
663 qos_enqueue(cn
->target
.out
.nb
->dev
, &(cn
->target
.out
.rb
),
665 } else if (rc
== RC_FLUSH_CONN_OUT_OOM
) {
666 printk(KERN_DEBUG
"oom");
667 #warning todo locking
668 qos_enqueue(cn
->target
.out
.nb
->dev
, &(cn
->target
.out
.rb
),
670 } else if (rc
== RC_FLUSH_CONN_OUT_OK_SENT
) {
675 void __init
forward_init(void)
677 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
678 sizeof(struct data_buf_item
), 8, 0, 0);
681 MODULE_LICENSE("GPL");