2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 void databuf_init(struct conn
*cn_init
)
29 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
30 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
33 static inline void databuf_item_unlink(struct conn
*cn_l
, struct data_buf_item
*item
)
35 list_del(&(item
->buf_list
));
36 if (item
->type
== TYPE_BUF
) {
37 cn_l
->data_buf
.overhead
-= sizeof(struct data_buf_item
) +
38 item
->buflen
- item
->datalen
;
39 } else if (item
->type
== TYPE_SKB
) {
40 cn_l
->data_buf
.overhead
-= sizeof(struct sk_buff
);
46 void databuf_ackdiscard(struct conn
*cn_l
)
49 while (!list_empty(&(cn_l
->data_buf
.items
))) {
50 struct data_buf_item
*item
= container_of(
51 cn_l
->data_buf
.items
.next
,
52 struct data_buf_item
, buf_list
);
53 freed
+= item
->datalen
;
55 databuf_item_unlink(cn_l
, item
);
56 databuf_item_free(item
);
59 cn_l
->data_buf
.totalsize
-= freed
;
60 cn_l
->data_buf
.first_offset
+= freed
;
62 BUG_ON(cn_l
->data_buf
.totalsize
!= 0);
63 BUG_ON(cn_l
->data_buf
.overhead
!= 0);
65 if (cn_l
->data_buf
.cpacket_buffer
!= 0) {
66 free_cpacket_buffer(cn_l
->data_buf
.cpacket_buffer
);
67 cn_l
->data_buf
.cpacket_buffer
= 0;
70 cn_l
->data_buf
.read_remaining
= 0;
71 cn_l
->data_buf
.last_read_offset
= 0;
72 cn_l
->data_buf
.lastread
= 0;
74 if (cn_l
->isreset
== 0 && cn_l
->sourcetype
== SOURCE_IN
)
75 refresh_speedstat(cn_l
, freed
);
78 void reset_seqno(struct conn
*cn_l
, __u32 initseqno
)
80 cn_l
->data_buf
.first_offset
= initseqno
-
81 cn_l
->data_buf
.totalsize
+
82 cn_l
->data_buf
.read_remaining
;
85 static void databuf_nextreadchunk(struct conn
*cn_l
)
87 if (cn_l
->data_buf
.lastread
== 0) {
88 BUG_ON(cn_l
->data_buf
.last_read_offset
!= 0);
89 BUG_ON(list_empty(&(cn_l
->data_buf
.items
)));
90 cn_l
->data_buf
.lastread
= container_of(
91 cn_l
->data_buf
.items
.next
,
92 struct data_buf_item
, buf_list
);
93 } else if (&(cn_l
->data_buf
.lastread
->buf_list
) !=
94 cn_l
->data_buf
.items
.prev
) {
95 cn_l
->data_buf
.lastread
= container_of(
96 cn_l
->data_buf
.lastread
->buf_list
.next
,
97 struct data_buf_item
, buf_list
);
99 cn_l
->data_buf
.last_read_offset
= 0;
103 void databuf_pull(struct conn
*cn_l
, char *dst
, int len
)
105 BUG_ON(cn_l
->data_buf
.read_remaining
< len
);
107 if (cn_l
->data_buf
.lastread
== 0)
108 databuf_nextreadchunk(cn_l
);
113 char *srcbufcpystart
= 0;
114 int srcbufcpylen
= 0;
116 BUG_ON(cn_l
->data_buf
.lastread
== 0);
118 srcbufcpystart
= cn_l
->data_buf
.lastread
->buf
+
119 cn_l
->data_buf
.last_read_offset
;
120 srcbufcpylen
= cn_l
->data_buf
.lastread
->datalen
-
121 cn_l
->data_buf
.last_read_offset
;
123 if (cpy
> srcbufcpylen
)
126 memcpy(dst
, srcbufcpystart
, cpy
);
131 cn_l
->data_buf
.read_remaining
-= cpy
;
132 cn_l
->data_buf
.last_read_offset
+= cpy
;
134 if (cpy
== srcbufcpylen
)
135 databuf_nextreadchunk(cn_l
);
139 void databuf_pull_dbi(struct cor_sock
*cs_rl
, struct conn
*trgt_sock_l
)
141 struct data_buf_item
*dbi
= 0;
142 BUG_ON(cs_rl
->type
!= SOCKTYPE_CONN
);
143 BUG_ON(cs_rl
->data
.conn
.rcvitem
!= 0);
145 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
148 if (trgt_sock_l
->data_buf
.lastread
== 0)
149 databuf_nextreadchunk(trgt_sock_l
);
151 dbi
= trgt_sock_l
->data_buf
.lastread
;
155 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
157 cs_rl
->data
.conn
.rcvitem
= dbi
;
158 cs_rl
->data
.conn
.rcvoffset
= trgt_sock_l
->data_buf
.last_read_offset
;
160 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
161 trgt_sock_l
->data_buf
.totalsize
-= dbi
->datalen
;
162 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
164 if (unlikely(trgt_sock_l
->data_buf
.cpacket_buffer
!= 0)) {
165 __u32 amount
= dbi
->datalen
<
166 trgt_sock_l
->data_buf
.cpacket_buffer
?
168 trgt_sock_l
->data_buf
.cpacket_buffer
;
169 free_cpacket_buffer(amount
);
170 trgt_sock_l
->data_buf
.cpacket_buffer
-= amount
;
173 databuf_item_unlink(trgt_sock_l
, dbi
);
175 trgt_sock_l
->data_buf
.lastread
= 0;
176 trgt_sock_l
->data_buf
.last_read_offset
= 0;
178 /* databuf_item_free(firstitem); */
181 void databuf_unpull(struct conn
*trgt_out_l
, __u32 bytes
)
183 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
185 BUG_ON(trgt_out_l
->data_buf
.lastread
== 0);
187 while (bytes
> trgt_out_l
->data_buf
.last_read_offset
) {
188 bytes
-= trgt_out_l
->data_buf
.last_read_offset
;
189 trgt_out_l
->data_buf
.lastread
= container_of(
190 trgt_out_l
->data_buf
.lastread
->buf_list
.prev
,
191 struct data_buf_item
, buf_list
);
192 BUG_ON(&(trgt_out_l
->data_buf
.lastread
->buf_list
) ==
193 &(trgt_out_l
->data_buf
.items
));
196 trgt_out_l
->data_buf
.last_read_offset
-= bytes
;
199 void databuf_pullold(struct conn
*trgt_out_l
, __u32 startpos
, char *dst
,
202 __u32 pos
= trgt_out_l
->data_buf
.first_offset
;
203 struct data_buf_item
*dbi
= container_of(
204 trgt_out_l
->data_buf
.items
.next
,
205 struct data_buf_item
, buf_list
);
208 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
210 if (((__s32
) (pos
+ dbi
->datalen
- startpos
)) > 0)
214 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
221 char *srcbufcpystart
= 0;
222 int srcbufcpylen
= 0;
224 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
226 BUG_ON(((__s32
) (pos
- startpos
)) > 0);
228 srcbufcpystart
= dbi
->buf
+ ((__s32
) (startpos
- pos
));
229 srcbufcpylen
= dbi
->datalen
- ((__s32
) (startpos
- pos
));
231 if (cpy
> srcbufcpylen
)
234 memcpy(dst
, srcbufcpystart
, cpy
);
241 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
246 /* ack up to *not* including pos */
247 void databuf_ack(struct conn
*trgt_out_l
, __u32 pos
)
251 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
252 struct data_buf_item
*firstitem
= container_of(
253 trgt_out_l
->data_buf
.items
.next
,
254 struct data_buf_item
, buf_list
);
256 if (firstitem
== trgt_out_l
->data_buf
.lastread
)
259 if ( ((__s32
) (trgt_out_l
->data_buf
.first_offset
+
260 firstitem
->datalen
- pos
)) > 0)
263 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
264 acked
+= firstitem
->datalen
;
266 databuf_item_unlink(trgt_out_l
, firstitem
);
267 databuf_item_free(firstitem
);
270 trgt_out_l
->data_buf
.totalsize
-= acked
;
272 BUG_ON(trgt_out_l
->data_buf
.totalsize
== 0 &&
273 trgt_out_l
->data_buf
.overhead
!= 0);
275 if (unlikely(trgt_out_l
->data_buf
.cpacket_buffer
!= 0)) {
276 __u32 amount
= acked
< trgt_out_l
->data_buf
.cpacket_buffer
?
277 acked
: trgt_out_l
->data_buf
.cpacket_buffer
;
278 free_cpacket_buffer(amount
);
279 trgt_out_l
->data_buf
.cpacket_buffer
-= amount
;
282 if (trgt_out_l
->sourcetype
== SOURCE_IN
)
283 refresh_speedstat(trgt_out_l
, acked
);
286 void databuf_ackread(struct conn
*cn_l
)
290 while (!list_empty(&(cn_l
->data_buf
.items
)) &&
291 cn_l
->data_buf
.lastread
!= 0) {
292 struct data_buf_item
*firstitem
= container_of(
293 cn_l
->data_buf
.items
.next
,
294 struct data_buf_item
, buf_list
);
296 if (firstitem
== cn_l
->data_buf
.lastread
)
299 acked
+= firstitem
->datalen
;
301 databuf_item_unlink(cn_l
, firstitem
);
302 databuf_item_free(firstitem
);
305 cn_l
->data_buf
.first_offset
+= acked
;
306 cn_l
->data_buf
.totalsize
-= acked
;
308 BUG_ON(cn_l
->data_buf
.totalsize
== 0 && cn_l
->data_buf
.overhead
!= 0);
310 if (unlikely(cn_l
->data_buf
.cpacket_buffer
!= 0)) {
311 __u32 amount
= acked
< cn_l
->data_buf
.cpacket_buffer
?
312 acked
: cn_l
->data_buf
.cpacket_buffer
;
313 free_cpacket_buffer(amount
);
314 cn_l
->data_buf
.cpacket_buffer
-= amount
;
317 if (cn_l
->sourcetype
== SOURCE_IN
)
318 refresh_speedstat(cn_l
, acked
);
321 __s64
receive_buf(struct conn
*cn_l
, char *buf
, __u32 datalen
, __u32 buflen
,
324 char *freewhenfinished
= 0;
325 struct data_buf_item
*item
= 0;
329 if (list_empty(&(cn_l
->data_buf
.items
)) == 0) {
330 struct list_head
*last
= cn_l
->data_buf
.items
.prev
;
331 item
= container_of(last
, struct data_buf_item
, buf_list
);
334 while (datalen
> 0) {
338 #warning todo convert to bugon and do check on caller
339 if (unlikely(unlikely(cn_l
->data_buf
.totalsize
+ datalen
>
341 unlikely(cn_l
->data_buf
.overhead
> (1<< 30)))) {
346 if (forcecpy
== 0 && item
!= 0 &&
347 datalen
< (item
->buflen
- item
->datalen
) &&
348 datalen
*2 < (buflen
+
349 sizeof(struct data_buf_item
))) {
351 freewhenfinished
= buf
;
354 if (forcecpy
== 0 || item
== 0 ||
355 item
->buflen
<= item
->datalen
) {
356 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_ATOMIC
);
357 if (unlikely(item
== 0)) {
362 memset(item
, 0, sizeof(item
));
363 item
->type
= TYPE_BUF
;
367 item
->datalen
= datalen
;
368 item
->buflen
= buflen
;
370 buflen
= buf_optlen(datalen
);
371 item
->buf
= kmalloc(buflen
, GFP_ATOMIC
);
373 if (unlikely(item
->buf
== 0)) {
374 kmem_cache_free(data_buf_item_slab
,
380 item
->buflen
= buflen
;
383 list_add_tail(&(item
->buf_list
),
384 &(cn_l
->data_buf
.items
));
386 cn_l
->data_buf
.overhead
+= item
->buflen
+
387 sizeof(struct data_buf_item
);
393 BUG_ON(item
->type
!= TYPE_BUF
);
394 BUG_ON(item
->buflen
<= item
->datalen
);
396 if (item
->buflen
- item
->datalen
< cpy
)
397 cpy
= (item
->buflen
- item
->datalen
);
399 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
400 item
->datalen
+= cpy
;
406 cn_l
->data_buf
.read_remaining
+= cpy
;
407 cn_l
->data_buf
.totalsize
+= cpy
;
408 cn_l
->data_buf
.overhead
-= cpy
;
409 BUG_ON(cn_l
->data_buf
.totalsize
!= 0 &&
410 cn_l
->data_buf
.overhead
== 0);
414 if (unlikely(rc
< 0)) {
421 if (freewhenfinished
!= 0)
422 kfree(freewhenfinished
);
427 void receive_cpacketresp(struct conn
*trtg_unconn_l
, char *buf
, int len
)
430 BUG_ON(trtg_unconn_l
->data_buf
.cpacket_buffer
<
431 trtg_unconn_l
->data_buf
.totalsize
+ len
);
432 rc
= receive_buf(trtg_unconn_l
, buf
, len
, len
, 1);
437 int receive_skb(struct conn
*src_in_l
, struct sk_buff
*skb
)
439 struct skb_procstate
*ps
= skb_pstate(skb
);
440 struct data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
442 if (unlikely(unlikely(src_in_l
->data_buf
.totalsize
+ skb
->len
>
443 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
447 item
->type
= TYPE_SKB
;
448 item
->buf
= skb
->data
;
449 item
->datalen
= skb
->len
;
450 item
->buflen
= item
->datalen
;
451 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
453 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
454 src_in_l
->data_buf
.totalsize
+= item
->datalen
;
455 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
460 void wake_sender(struct conn
*cn
)
462 unreserve_sock_buffer(cn
);
464 spin_lock_bh(&(cn
->rcv_lock
));
465 switch (cn
->sourcetype
) {
467 spin_unlock_bh(&(cn
->rcv_lock
));
468 parse(cn
->reversedir
, 0);
471 wake_up_interruptible(&(cn
->source
.sock
.wait
));
472 spin_unlock_bh(&(cn
->rcv_lock
));
476 spin_unlock_bh(&(cn
->rcv_lock
));
477 get_window(cn
, 0, 0, 0);
484 void flush_buf(struct conn
*cn
)
488 spin_lock_bh(&(cn
->rcv_lock
));
490 switch (cn
->targettype
) {
491 case TARGET_UNCONNECTED
:
492 spin_unlock_bh(&(cn
->rcv_lock
));
496 if (cn
->sourcetype
!= SOURCE_SOCK
||
497 cn
->source
.sock
.delay_flush
== 0 ||
498 cn
->data_buf
.totalsize
+
499 cn
->data_buf
.overhead
-
500 cn
->data_buf
.cpacket_buffer
>=
501 BUFFERLIMIT_SOCK_SOCK
/2)
502 wake_up_interruptible(&(cn
->target
.sock
.wait
));
503 spin_unlock_bh(&(cn
->rcv_lock
));
506 rc
= flush_out(cn
, 0, 0);
507 spin_unlock_bh(&(cn
->rcv_lock
));
508 sent
= (rc
== RC_FLUSH_CONN_OUT_OK_SENT
);
511 databuf_ackdiscard(cn
);
512 spin_unlock_bh(&(cn
->rcv_lock
));
519 refresh_conn_credits(cn
, 0, 0);
526 void __init
forward_init(void)
528 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
529 sizeof(struct data_buf_item
), 8, 0, 0);
532 MODULE_LICENSE("GPL");