2 * Connection oriented routing
3 * Copyright (C) 2007-2019 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 atomic64_t bufused_sum
;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&bufused_sum);
34 void databuf_init(struct conn
*cn_init
)
36 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
37 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
40 void bufsize_init(struct conn
*cn_l
, __u32 bufsize
)
42 __u32 bufsize_shifted
;
44 memset(&(cn_l
->bufsize
), 0, sizeof(cn_l
->bufsize
));
46 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
47 bufsize_shifted
= U32_MAX
;
49 bufsize_shifted
= bufsize
<< 5;
51 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
52 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
53 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
56 #warning todo reset stalled conns on low memory
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int account_bufspace(struct conn
*cn_lx
)
63 __u64 space_needed
= 0;
65 __u64 bufused_sum_int
;
67 if (likely(cn_lx
->isreset
== 0)) {
68 space_needed
+= cn_lx
->data_buf
.datasize
;
69 space_needed
+= cn_lx
->data_buf
.overhead
;
70 if (cn_lx
->sourcetype
== SOURCE_IN
) {
71 space_needed
+= cn_lx
->source
.in
.reorder_memused
;
75 if (cn_lx
->bufspace_accounted
== space_needed
) {
76 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
78 } else if (atomic64_read(&bufused_sum
) >= BUFUSAGE_GLOBAL_MAX
) {
85 if (unlikely(space_needed
>= U32_MAX
))
88 space_req
= space_needed
;
90 if (cn_lx
->bufspace_accounted
== space_req
)
93 bufused_sum_int
= update_atomic_sum(&bufused_sum
,
94 cn_lx
->bufspace_accounted
, space_req
);
96 cn_lx
->bufspace_accounted
= space_req
;
98 if (space_needed
!= space_req
)
100 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
102 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
108 int cpacket_write_allowed(struct conn
*src_unconn_lx
)
110 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
112 if (src_unconn_lx
->data_buf
.datasize
== 0)
114 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_CPACKETS
&&
115 account_bufspace(src_unconn_lx
) == 0)
121 void update_windowlimit(struct conn
*src_in_lx
)
125 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
127 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
129 if (src_in_lx
->targettype
!= TARGET_OUT
) {
130 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
131 account_bufspace(src_in_lx
)) {
132 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
134 } else if (seqno_before_eq(src_in_lx
->target
.out
.seqno_windowlimit
,
135 src_in_lx
->target
.out
.seqno_nextsend
)) {
136 if (account_bufspace(src_in_lx
)) {
137 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
140 __u32 windowleft
= (__u32
) min((__u64
) U32_MAX
,
142 src_in_lx
->target
.out
.seqno_windowlimit
-
143 src_in_lx
->target
.out
.seqno_nextsend
));
145 bufsize
= max(bufsize
, min(windowleft
,
146 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
148 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
149 account_bufspace(src_in_lx
)) {
150 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
154 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
155 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
157 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
159 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
162 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
164 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
167 src_in_lx
->source
.in
.window_seqnolimit
=
168 src_in_lx
->source
.in
.next_seqno
+ bufsize
;
171 static int bufsize_high_latency_sender(struct conn
*cn_lx
)
177 if (cn_lx
->sourcetype
!= SOURCE_IN
)
180 nb
= cn_lx
->source
.in
.nb
;
181 if (unlikely(nb
== 0))
184 latency_us
= atomic_read(&(nb
->latency_retrans_us
));
185 latency_us
+= atomic_read(&(nb
->latency_stddev_retrans_us
));
186 latency_us
+= CMSG_MAXDELAY_ACKCONN_MS
* 1000;
188 return latency_us
> 100000 ? 1 : 0;
191 static void _bufsize_update(struct conn
*cn_lx
, __u32 rcvd
,
192 int high_latency_sender
, int high_latency_conn
)
194 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
195 BUG_ON(BUFSIZE_SHIFT
!= 5);
197 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
198 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
199 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
203 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
204 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
205 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
208 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
209 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
213 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
216 if (high_latency_sender
)
219 change
= rcvd
* speed
;
220 if (high_latency_conn
)
223 if (cn_lx
->bufsize
.bufsize
< change
)
224 cn_lx
->bufsize
.bufsize
= 0;
226 cn_lx
->bufsize
.bufsize
-= change
;
228 if (cn_lx
->bufsize
.act
.decr
.size_start
/4 >
229 cn_lx
->bufsize
.bufsize
)
230 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
231 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
232 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
236 if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
239 if (high_latency_sender
)
242 change
= rcvd
* speed
;
243 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
244 cn_lx
->bufsize
.bufsize
= U32_MAX
;
246 cn_lx
->bufsize
.bufsize
+= change
;
248 if (cn_lx
->bufsize
.bufsize
>=
249 cn_lx
->bufsize
.act
.incr
.size_end
) {
250 cn_lx
->bufsize
.bufsize
=
251 cn_lx
->bufsize
.act
.incr
.size_end
;
252 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
253 cn_lx
->bufsize
.act
.noact
.bytesleft
=
254 (cn_lx
->bufsize
.bufsize
>>
262 static __u32
get_read_remaining_min(__u32 bufsize_bytes
,
263 int high_latency_sender
, int high_latency_conn
)
265 int bufspace_low
= (atomic64_read(&bufused_sum
) >=
266 3*(BUFUSAGE_GLOBAL_MAX
/4));
268 if (high_latency_conn
) {
269 if (high_latency_sender
) {
271 return bufsize_bytes
/6 + 1;
273 return bufsize_bytes
/3 + 1;
277 return bufsize_bytes
/8 + 1;
279 return bufsize_bytes
/4 + 1;
283 if (high_latency_sender
) {
285 return bufsize_bytes
/6 + 1;
287 return bufsize_bytes
/4 + 1;
291 return bufsize_bytes
/12 + 1;
293 return bufsize_bytes
/8 + 1;
299 static void bufsize_update(struct conn
*cn_lx
, __u32 rcvd
,
300 int rcv_delayed_lowbuf
, __u8 rcv_flushrcvd
)
302 int high_latency_sender
= bufsize_high_latency_sender(cn_lx
);
303 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
304 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
305 __u32 read_remaining_min
= get_read_remaining_min(bufsize_bytes
,
306 high_latency_sender
, high_latency_conn
);
307 __u32 read_remaining_min_nofastdecr
= read_remaining_min
*2;
308 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
309 read_remaining_min_nofastdecr
)/2;
310 __u32 read_remaining
= cn_lx
->data_buf
.read_remaining
- rcvd
;
312 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
314 if (rcv_flushrcvd
!= 0) {
315 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
316 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
317 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
318 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
319 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
320 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
321 cn_lx
->bufsize
.act
.noact
.bytesleft
<
323 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
327 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
328 rcv_delayed_lowbuf
= 0;
329 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
330 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
332 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
335 if (rcv_delayed_lowbuf
&& read_remaining
< read_remaining_min
) {
336 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
340 if (high_latency_sender
) {
341 if (buf_increase_bytes
< rcvd
/16)
342 buf_increase_bytes
= rcvd
/16;
344 if (buf_increase_bytes
< rcvd
/32)
345 buf_increase_bytes
= rcvd
/32;
348 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
349 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
351 buf_increase
= U32_MAX
;
353 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
354 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
355 bufsize_end
= U32_MAX
;
357 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
358 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
359 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
360 cn_lx
->bufsize
.act
.incr
.size_start
=
361 cn_lx
->bufsize
.bufsize
;
362 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
365 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
366 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
367 if (bufsize_end
/4 > cn_lx
->bufsize
.act
.incr
.size_start
)
368 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
369 } else if (rcv_delayed_lowbuf
&&
370 read_remaining
< read_remaining_min_nodecr
) {
371 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
372 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
373 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
376 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
377 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
378 if (high_latency_conn
)
380 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
381 rtt_mul
> bytesleft
))
382 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
383 BUFSIZE_SHIFT
) * rtt_mul
;
385 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
386 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
388 } else if (rcv_delayed_lowbuf
&&
389 read_remaining
< read_remaining_min_nofastdecr
) {
390 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
391 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
392 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
393 cn_lx
->bufsize
.act
.decr
.size_start
=
394 cn_lx
->bufsize
.bufsize
;
398 _bufsize_update(cn_lx
, rcvd
, high_latency_sender
, high_latency_conn
);
401 void bufsize_read_to_sock(struct conn
*trgt_sock_lx
)
403 unsigned long jiffies_tmp
= jiffies
;
404 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
407 if (trgt_sock_lx
->target
.sock
.waiting_for_userspace
!= 0 && time_before(
408 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
,
409 jiffies
- latency_limit
)) {
410 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
411 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
414 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
415 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 0;
417 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 1;
418 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
=
423 static inline void databuf_item_unlink(struct conn
*cn_lx
,
424 struct data_buf_item
*item
)
426 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
427 list_del(&(item
->buf_list
));
428 if (item
->type
== DATABUF_BUF
) {
429 cn_lx
->data_buf
.overhead
-= sizeof(struct data_buf_item
) +
430 item
->buflen
- item
->datalen
;
431 } else if (item
->type
== DATABUF_SKB
) {
432 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
438 void databuf_ackdiscard(struct conn
*cn_lx
)
442 cn_lx
->data_buf
.next_read_offset
= 0;
443 cn_lx
->data_buf
.nextread
= 0;
445 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
446 struct data_buf_item
*item
= container_of(
447 cn_lx
->data_buf
.items
.next
,
448 struct data_buf_item
, buf_list
);
449 freed
+= item
->datalen
;
451 databuf_item_unlink(cn_lx
, item
);
452 databuf_item_free(item
);
455 cn_lx
->data_buf
.datasize
-= freed
;
456 cn_lx
->data_buf
.first_offset
+= freed
;
458 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
459 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
461 cn_lx
->data_buf
.read_remaining
= 0;
464 void reset_seqno(struct conn
*cn_l
, __u64 initseqno
)
466 cn_l
->data_buf
.first_offset
= initseqno
-
467 cn_l
->data_buf
.datasize
+
468 cn_l
->data_buf
.read_remaining
;
471 static void databuf_nextreadchunk(struct conn
*cn_lx
)
473 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
474 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
475 cn_lx
->data_buf
.nextread
->datalen
);
477 if (&(cn_lx
->data_buf
.nextread
->buf_list
) ==
478 cn_lx
->data_buf
.items
.prev
) {
479 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
480 cn_lx
->data_buf
.nextread
= 0;
483 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
484 cn_lx
->data_buf
.nextread
= container_of(
485 cn_lx
->data_buf
.nextread
->buf_list
.next
,
486 struct data_buf_item
, buf_list
);
489 cn_lx
->data_buf
.next_read_offset
= 0;
492 void databuf_pull(struct conn
*cn_lx
, char *dst
, __u32 len
)
494 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
499 char *srcbufcpystart
= 0;
500 int srcbufcpylen
= 0;
502 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
503 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
504 cn_lx
->data_buf
.nextread
->datalen
);
506 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
507 cn_lx
->data_buf
.next_read_offset
;
508 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
509 cn_lx
->data_buf
.next_read_offset
;
511 if (cpy
> srcbufcpylen
)
514 memcpy(dst
, srcbufcpystart
, cpy
);
519 cn_lx
->data_buf
.read_remaining
-= cpy
;
520 cn_lx
->data_buf
.next_read_offset
+= cpy
;
522 if (cpy
== srcbufcpylen
)
523 databuf_nextreadchunk(cn_lx
);
527 void databuf_unpull_dpi(struct conn
*trgt_sock
, struct cor_sock
*cs
,
528 struct data_buf_item
*item
, __u16 next_read_offset
)
530 BUG_ON(next_read_offset
> item
->datalen
);
532 if (next_read_offset
>= item
->datalen
)
535 spin_lock_bh(&(trgt_sock
->rcv_lock
));
537 if (unlikely(is_trgt_sock(trgt_sock
, cs
) == 0)) {
538 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
542 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
543 &(trgt_sock
->data_buf
.nextread
->buf_list
) !=
544 trgt_sock
->data_buf
.items
.next
);
545 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
547 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
548 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
549 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
551 if (item
->type
== DATABUF_BUF
) {
552 trgt_sock
->data_buf
.overhead
+= sizeof(struct data_buf_item
) +
553 item
->buflen
- item
->datalen
;
554 } else if (item
->type
== DATABUF_SKB
) {
555 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
560 list_add(&(item
->buf_list
), &(trgt_sock
->data_buf
.items
));
562 trgt_sock
->data_buf
.nextread
= item
;
563 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
565 account_bufspace(trgt_sock
);
567 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
571 databuf_item_free(item
);
575 void databuf_pull_dbi(struct cor_sock
*cs_rl
, struct conn
*trgt_sock_l
)
577 struct data_buf_item
*dbi
= 0;
578 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
579 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
581 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
584 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
585 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
586 trgt_sock_l
->data_buf
.nextread
->datalen
);
587 dbi
= trgt_sock_l
->data_buf
.nextread
;
589 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
591 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
592 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
594 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
595 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
596 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
598 account_bufspace(trgt_sock_l
);
600 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
601 databuf_nextreadchunk(trgt_sock_l
);
603 databuf_item_unlink(trgt_sock_l
, dbi
);
606 void databuf_unpull(struct conn
*trgt_out_l
, __u32 bytes
)
608 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
610 BUG_ON(list_empty(&(trgt_out_l
->data_buf
.items
)) != 0);
612 if (trgt_out_l
->data_buf
.nextread
== 0) {
613 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
615 trgt_out_l
->data_buf
.nextread
= container_of(
616 trgt_out_l
->data_buf
.items
.prev
,
617 struct data_buf_item
, buf_list
);
620 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
621 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
622 trgt_out_l
->data_buf
.nextread
= container_of(
623 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
624 struct data_buf_item
, buf_list
);
625 BUG_ON(&(trgt_out_l
->data_buf
.nextread
->buf_list
) ==
626 &(trgt_out_l
->data_buf
.items
));
627 trgt_out_l
->data_buf
.next_read_offset
=
628 trgt_out_l
->data_buf
.nextread
->datalen
;
631 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
634 void databuf_pullold(struct conn
*trgt_out_l
, __u64 startpos
, char *dst
,
637 __u64 pos
= trgt_out_l
->data_buf
.first_offset
;
638 struct data_buf_item
*dbi
= container_of(
639 trgt_out_l
->data_buf
.items
.next
,
640 struct data_buf_item
, buf_list
);
643 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
645 if (seqno_after(pos
+ dbi
->datalen
, startpos
))
649 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
656 char *srcbufcpystart
= 0;
657 int srcbufcpylen
= 0;
659 __u64 offset
= seqno_clean(startpos
- pos
);
661 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
663 BUG_ON(seqno_before(startpos
, pos
));
664 BUG_ON(offset
> dbi
->datalen
);
666 srcbufcpystart
= dbi
->buf
+ offset
;
667 srcbufcpylen
= dbi
->datalen
- offset
;
669 if (cpy
> srcbufcpylen
)
672 memcpy(dst
, srcbufcpystart
, cpy
);
679 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
684 /* ack up to *not* including pos */
685 void databuf_ack(struct conn
*trgt_out_l
, __u64 pos
)
689 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
690 struct data_buf_item
*firstitem
= container_of(
691 trgt_out_l
->data_buf
.items
.next
,
692 struct data_buf_item
, buf_list
);
694 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
697 if (seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
698 firstitem
->datalen
, pos
))
701 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
702 acked
+= firstitem
->datalen
;
704 databuf_item_unlink(trgt_out_l
, firstitem
);
705 databuf_item_free(firstitem
);
708 trgt_out_l
->data_buf
.datasize
-= acked
;
710 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
711 trgt_out_l
->data_buf
.overhead
!= 0);
714 account_bufspace(trgt_out_l
);
717 void databuf_ackread(struct conn
*cn_lx
)
721 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
722 struct data_buf_item
*firstitem
= container_of(
723 cn_lx
->data_buf
.items
.next
,
724 struct data_buf_item
, buf_list
);
726 if (firstitem
== cn_lx
->data_buf
.nextread
)
729 acked
+= firstitem
->datalen
;
731 databuf_item_unlink(cn_lx
, firstitem
);
732 databuf_item_free(firstitem
);
735 cn_lx
->data_buf
.datasize
-= acked
;
736 cn_lx
->data_buf
.first_offset
+= acked
;
738 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
741 account_bufspace(cn_lx
);
744 __u32
receive_buf(struct conn
*cn_lx
, char *buf
, __u32 datalen
,
745 int rcv_delayed_lowbuf
, __u8 flush
)
747 struct data_buf_item
*item
= 0;
751 if (list_empty(&(cn_lx
->data_buf
.items
)) == 0) {
752 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
753 item
= container_of(last
, struct data_buf_item
, buf_list
);
756 while (datalen
> 0) {
759 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
760 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
762 if (item
== 0 || item
->type
!= DATABUF_BUF
||
763 item
->buflen
<= item
->datalen
) {
764 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_ATOMIC
);
765 if (unlikely(item
== 0))
768 memset(item
, 0, sizeof(struct data_buf_item
));
769 item
->type
= DATABUF_BUF
;
771 item
->buflen
= buf_optlen(datalen
);
772 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
774 if (unlikely(item
->buf
== 0)) {
775 kmem_cache_free(data_buf_item_slab
, item
);
780 list_add_tail(&(item
->buf_list
),
781 &(cn_lx
->data_buf
.items
));
783 cn_lx
->data_buf
.overhead
+= item
->buflen
+
784 sizeof(struct data_buf_item
);
787 BUG_ON(item
->type
!= DATABUF_BUF
);
788 BUG_ON(item
->buflen
<= item
->datalen
);
790 if (cn_lx
->data_buf
.nextread
== 0) {
791 cn_lx
->data_buf
.nextread
= item
;
792 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
795 if (item
->buflen
- item
->datalen
< cpy
)
796 cpy
= (item
->buflen
- item
->datalen
);
798 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
799 item
->datalen
+= cpy
;
801 BUG_ON(cpy
> datalen
);
806 cn_lx
->data_buf
.read_remaining
+= cpy
;
807 cn_lx
->data_buf
.datasize
+= cpy
;
808 cn_lx
->data_buf
.overhead
-= cpy
;
809 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
810 cn_lx
->data_buf
.overhead
== 0);
815 cn_lx
->flush
= flush
;
817 account_bufspace(cn_lx
);
818 bufsize_update(cn_lx
, totalcpy
, rcv_delayed_lowbuf
, flush
);
823 __u32
receive_skb(struct conn
*src_in_l
, struct sk_buff
*skb
,
824 int rcv_delayed_lowbuf
, __u8 flush
)
826 struct skb_procstate
*ps
= skb_pstate(skb
);
827 struct data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
829 __u32 bufferleft
= 0;
831 BUG_ON(skb
->len
<= 0);
833 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
834 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
838 if (list_empty(&(src_in_l
->data_buf
.items
)) == 0) {
839 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
840 struct data_buf_item
*item
= container_of(last
,
841 struct data_buf_item
, buf_list
);
842 bufferleft
= item
->buflen
- item
->datalen
;
845 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
846 __u32 rc
= receive_buf(src_in_l
, skb
->data
, skb
->len
,
847 rcv_delayed_lowbuf
, flush
);
848 if (likely(rc
== skb
->len
))
853 memset(item
, 0, sizeof(struct data_buf_item
));
855 item
->type
= DATABUF_SKB
;
856 item
->buf
= skb
->data
;
857 item
->datalen
= skb
->len
;
858 item
->buflen
= item
->datalen
;
859 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
860 if (src_in_l
->data_buf
.nextread
== 0)
861 src_in_l
->data_buf
.nextread
= item
;
863 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
864 src_in_l
->data_buf
.datasize
+= item
->datalen
;
865 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
867 account_bufspace(src_in_l
);
868 bufsize_update(src_in_l
, skb
->len
, rcv_delayed_lowbuf
, flush
);
870 src_in_l
->flush
= flush
;
875 void wake_sender(struct conn
*cn
)
877 spin_lock_bh(&(cn
->rcv_lock
));
879 if (unlikely(cn
->isreset
)) {
880 spin_unlock_bh(&(cn
->rcv_lock
));
884 switch (cn
->sourcetype
) {
885 case SOURCE_UNCONNECTED
:
886 spin_unlock_bh(&(cn
->rcv_lock
));
887 spin_lock_bh(&(cn
->reversedir
->rcv_lock
));
888 if (likely(cn
->reversedir
->isreset
== 0 &&
889 cn
->reversedir
->targettype
==
891 proc_cpacket(cn
->reversedir
);
892 spin_unlock_bh(&(cn
->reversedir
->rcv_lock
));
895 if (cn
->source
.sock
.cs
!= 0 /*&& cor_sock_sndbufavailable(cn)*/)
896 cor_sk_write_space(cn
->source
.sock
.cs
);
897 spin_unlock_bh(&(cn
->rcv_lock
));
901 if (likely(cn
->source
.in
.established
!= 0)) {
902 send_ack_conn_ifneeded(cn
, 0, 0);
904 spin_unlock_bh(&(cn
->rcv_lock
));
911 int __init
forward_init(void)
913 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
914 sizeof(struct data_buf_item
), 8, 0, 0);
915 if (unlikely(data_buf_item_slab
== 0))
918 atomic64_set(&bufused_sum
, 0);
923 MODULE_LICENSE("GPL");