2 * Connection oriented routing
3 * Copyright (C) 2007-2019 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*data_buf_item_slab
;
27 atomic64_t bufused_sum
;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&bufused_sum);
34 void databuf_init(struct conn
*cn_init
)
36 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
37 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
40 void bufsize_init(struct conn
*cn_l
, __u32 bufsize
)
42 __u32 bufsize_shifted
;
44 memset(&(cn_l
->bufsize
), 0, sizeof(cn_l
->bufsize
));
46 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
47 bufsize_shifted
= U32_MAX
;
49 bufsize_shifted
= bufsize
<< 5;
51 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
52 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
53 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
56 #warning todo reset stalled conns on low memory
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int account_bufspace(struct conn
*cn_lx
)
63 __u64 space_needed
= 0;
65 __u64 bufused_sum_int
;
67 if (likely(cn_lx
->isreset
== 0)) {
68 space_needed
+= cn_lx
->data_buf
.datasize
;
69 space_needed
+= cn_lx
->data_buf
.overhead
;
70 if (cn_lx
->sourcetype
== SOURCE_IN
) {
71 space_needed
+= cn_lx
->source
.in
.reorder_memused
;
75 if (cn_lx
->bufspace_accounted
== space_needed
) {
76 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
78 } else if (atomic64_read(&bufused_sum
) >= BUFUSAGE_GLOBAL_MAX
) {
85 if (unlikely(space_needed
>= U32_MAX
))
88 space_req
= space_needed
;
90 if (cn_lx
->bufspace_accounted
== space_req
)
93 bufused_sum_int
= update_atomic_sum(&bufused_sum
,
94 cn_lx
->bufspace_accounted
, space_req
);
96 cn_lx
->bufspace_accounted
= space_req
;
98 if (space_needed
!= space_req
)
100 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
102 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
108 int cpacket_write_allowed(struct conn
*src_unconn_lx
)
110 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
112 if (src_unconn_lx
->data_buf
.datasize
== 0)
114 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_CPACKETS
&&
115 account_bufspace(src_unconn_lx
) == 0)
121 void update_windowlimit(struct conn
*src_in_lx
)
125 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
127 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
129 if (src_in_lx
->targettype
!= TARGET_OUT
) {
130 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
131 account_bufspace(src_in_lx
)) {
132 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
134 } else if (seqno_before_eq(src_in_lx
->target
.out
.seqno_windowlimit
,
135 src_in_lx
->target
.out
.seqno_nextsend
)) {
136 if (account_bufspace(src_in_lx
)) {
137 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
140 __u32 windowleft
= (__u32
) min((__u64
) U32_MAX
,
142 src_in_lx
->target
.out
.seqno_windowlimit
-
143 src_in_lx
->target
.out
.seqno_nextsend
));
145 bufsize
= max(bufsize
, min(windowleft
,
146 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
148 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
149 account_bufspace(src_in_lx
)) {
150 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
154 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
155 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
157 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
159 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
162 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
164 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
167 src_in_lx
->source
.in
.window_seqnolimit
=
168 src_in_lx
->source
.in
.next_seqno
+ bufsize
;
171 static int bufsize_high_latency_sender(struct conn
*cn_lx
)
177 if (cn_lx
->sourcetype
!= SOURCE_IN
)
180 nb
= cn_lx
->source
.in
.nb
;
181 if (unlikely(nb
== 0))
184 latency_us
= atomic_read(&(nb
->latency_retrans_us
));
185 latency_us
+= atomic_read(&(nb
->latency_stddev_retrans_us
));
186 latency_us
+= CMSG_MAXDELAY_ACKCONN_MS
* 1000;
188 return latency_us
> 100000 ? 1 : 0;
191 static void _bufsize_update(struct conn
*cn_lx
, __u32 rcvd
,
192 int high_latency_sender
, int high_latency_conn
)
194 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
195 BUG_ON(BUFSIZE_SHIFT
!= 5);
197 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
198 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
199 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
203 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
204 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
205 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
208 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
209 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
213 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
216 if (high_latency_sender
)
219 change
= rcvd
* speed
;
220 if (high_latency_conn
)
223 if (cn_lx
->bufsize
.bufsize
< change
)
224 cn_lx
->bufsize
.bufsize
= 0;
226 cn_lx
->bufsize
.bufsize
-= change
;
228 if (cn_lx
->bufsize
.act
.decr
.size_start
/4 >
229 cn_lx
->bufsize
.bufsize
)
230 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
231 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
232 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
236 if (high_latency_sender
)
239 if (cn_lx
->bufsize
.bytes_rcvd
!= (1 << 24) - 1 &&
240 cn_lx
->bufsize
.bytes_rcvd
<
241 cn_lx
->bufsize
.bufsize
)
243 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
246 change
= rcvd
* speed
;
247 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
248 cn_lx
->bufsize
.bufsize
= U32_MAX
;
250 cn_lx
->bufsize
.bufsize
+= change
;
252 if (cn_lx
->bufsize
.bufsize
>=
253 cn_lx
->bufsize
.act
.incr
.size_end
) {
254 cn_lx
->bufsize
.bufsize
=
255 cn_lx
->bufsize
.act
.incr
.size_end
;
256 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
257 if (high_latency_conn
) {
258 cn_lx
->bufsize
.act
.noact
.bytesleft
=
259 (cn_lx
->bufsize
.bufsize
>>
262 cn_lx
->bufsize
.act
.noact
.bytesleft
=
263 (cn_lx
->bufsize
.bufsize
>>
271 if (unlikely(rcvd
>= (1 << 24)) ||
272 cn_lx
->bufsize
.bytes_rcvd
+ rcvd
>= (1 << 24))
273 cn_lx
->bufsize
.bytes_rcvd
= (1 << 24) - 1;
275 cn_lx
->bufsize
.bytes_rcvd
+= rcvd
;
278 static __u32
get_read_remaining_min(__u32 bufsize_bytes
,
279 int high_latency_sender
, int high_latency_conn
)
281 int bufspace_low
= (atomic64_read(&bufused_sum
) >=
282 3*(BUFUSAGE_GLOBAL_MAX
/4));
284 if (high_latency_conn
) {
285 if (high_latency_sender
) {
287 return bufsize_bytes
/6 + 1;
289 return bufsize_bytes
/3 + 1;
293 return bufsize_bytes
/8 + 1;
295 return bufsize_bytes
/4 + 1;
299 if (high_latency_sender
) {
301 return bufsize_bytes
/6 + 1;
303 return bufsize_bytes
/4 + 1;
307 return bufsize_bytes
/12 + 1;
309 return bufsize_bytes
/8 + 1;
315 static void bufsize_update(struct conn
*cn_lx
, __u32 rcvd
,
316 int rcv_delayed_lowbuf
, __u8 rcv_flushrcvd
)
318 int high_latency_sender
= bufsize_high_latency_sender(cn_lx
);
319 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
320 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
321 __u32 read_remaining_min
= get_read_remaining_min(bufsize_bytes
,
322 high_latency_sender
, high_latency_conn
);
323 __u32 read_remaining_min_nofastdecr
= read_remaining_min
*2;
324 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
325 read_remaining_min_nofastdecr
)/2;
326 __u32 read_remaining
= cn_lx
->data_buf
.read_remaining
- rcvd
;
328 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
330 if (rcv_flushrcvd
!= 0) {
331 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
332 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
333 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
334 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
335 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
336 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
337 cn_lx
->bufsize
.act
.noact
.bytesleft
<
339 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
343 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
344 rcv_delayed_lowbuf
= 0;
345 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
346 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
348 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
351 if (rcv_delayed_lowbuf
&& read_remaining
< read_remaining_min
) {
352 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
356 if (high_latency_sender
) {
357 if (buf_increase_bytes
< rcvd
/16)
358 buf_increase_bytes
= rcvd
/16;
360 if (buf_increase_bytes
< rcvd
/32)
361 buf_increase_bytes
= rcvd
/32;
364 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
365 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
367 buf_increase
= U32_MAX
;
369 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
370 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
371 bufsize_end
= U32_MAX
;
373 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
374 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
375 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
376 cn_lx
->bufsize
.act
.incr
.size_start
=
377 cn_lx
->bufsize
.bufsize
;
378 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
381 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
382 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
383 if (bufsize_end
/4 > cn_lx
->bufsize
.act
.incr
.size_start
)
384 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
385 } else if (rcv_delayed_lowbuf
&&
386 read_remaining
< read_remaining_min_nodecr
) {
387 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
388 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
389 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
392 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
393 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
394 if (high_latency_conn
)
396 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
397 rtt_mul
> bytesleft
))
398 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
399 BUFSIZE_SHIFT
) * rtt_mul
;
401 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
402 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
404 } else if (rcv_delayed_lowbuf
&&
405 read_remaining
< read_remaining_min_nofastdecr
) {
406 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
407 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
408 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
409 cn_lx
->bufsize
.act
.decr
.size_start
=
410 cn_lx
->bufsize
.bufsize
;
414 _bufsize_update(cn_lx
, rcvd
, high_latency_sender
, high_latency_conn
);
417 void bufsize_read_to_sock(struct conn
*trgt_sock_lx
)
419 unsigned long jiffies_tmp
= jiffies
;
420 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
423 if (trgt_sock_lx
->target
.sock
.waiting_for_userspace
!= 0 && time_before(
424 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
,
425 jiffies
- latency_limit
)) {
426 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
427 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
430 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
431 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 0;
433 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 1;
434 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
=
439 static inline void databuf_item_unlink(struct conn
*cn_lx
,
440 struct data_buf_item
*item
)
442 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
443 list_del(&(item
->buf_list
));
444 if (item
->type
== DATABUF_BUF
) {
445 cn_lx
->data_buf
.overhead
-= sizeof(struct data_buf_item
) +
446 item
->buflen
- item
->datalen
;
447 } else if (item
->type
== DATABUF_SKB
) {
448 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
454 void databuf_ackdiscard(struct conn
*cn_lx
)
458 cn_lx
->data_buf
.next_read_offset
= 0;
459 cn_lx
->data_buf
.nextread
= 0;
461 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
462 struct data_buf_item
*item
= container_of(
463 cn_lx
->data_buf
.items
.next
,
464 struct data_buf_item
, buf_list
);
465 freed
+= item
->datalen
;
467 databuf_item_unlink(cn_lx
, item
);
468 databuf_item_free(item
);
471 cn_lx
->data_buf
.datasize
-= freed
;
472 cn_lx
->data_buf
.first_offset
+= freed
;
474 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
475 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
477 cn_lx
->data_buf
.read_remaining
= 0;
480 void reset_seqno(struct conn
*cn_l
, __u64 initseqno
)
482 cn_l
->data_buf
.first_offset
= initseqno
-
483 cn_l
->data_buf
.datasize
+
484 cn_l
->data_buf
.read_remaining
;
487 static void databuf_nextreadchunk(struct conn
*cn_lx
)
489 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
490 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
491 cn_lx
->data_buf
.nextread
->datalen
);
493 if (&(cn_lx
->data_buf
.nextread
->buf_list
) ==
494 cn_lx
->data_buf
.items
.prev
) {
495 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
496 cn_lx
->data_buf
.nextread
= 0;
499 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
500 cn_lx
->data_buf
.nextread
= container_of(
501 cn_lx
->data_buf
.nextread
->buf_list
.next
,
502 struct data_buf_item
, buf_list
);
505 cn_lx
->data_buf
.next_read_offset
= 0;
508 void databuf_pull(struct conn
*cn_lx
, char *dst
, __u32 len
)
510 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
515 char *srcbufcpystart
= 0;
516 int srcbufcpylen
= 0;
518 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
519 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
520 cn_lx
->data_buf
.nextread
->datalen
);
522 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
523 cn_lx
->data_buf
.next_read_offset
;
524 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
525 cn_lx
->data_buf
.next_read_offset
;
527 if (cpy
> srcbufcpylen
)
530 memcpy(dst
, srcbufcpystart
, cpy
);
535 cn_lx
->data_buf
.read_remaining
-= cpy
;
536 cn_lx
->data_buf
.next_read_offset
+= cpy
;
538 if (cpy
== srcbufcpylen
)
539 databuf_nextreadchunk(cn_lx
);
543 void databuf_unpull_dpi(struct conn
*trgt_sock
, struct cor_sock
*cs
,
544 struct data_buf_item
*item
, __u16 next_read_offset
)
546 BUG_ON(next_read_offset
> item
->datalen
);
548 if (next_read_offset
>= item
->datalen
)
551 spin_lock_bh(&(trgt_sock
->rcv_lock
));
553 if (unlikely(is_trgt_sock(trgt_sock
, cs
) == 0)) {
554 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
558 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
559 &(trgt_sock
->data_buf
.nextread
->buf_list
) !=
560 trgt_sock
->data_buf
.items
.next
);
561 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
563 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
564 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
565 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
567 if (item
->type
== DATABUF_BUF
) {
568 trgt_sock
->data_buf
.overhead
+= sizeof(struct data_buf_item
) +
569 item
->buflen
- item
->datalen
;
570 } else if (item
->type
== DATABUF_SKB
) {
571 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
576 list_add(&(item
->buf_list
), &(trgt_sock
->data_buf
.items
));
578 trgt_sock
->data_buf
.nextread
= item
;
579 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
581 account_bufspace(trgt_sock
);
583 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
587 databuf_item_free(item
);
591 void databuf_pull_dbi(struct cor_sock
*cs_rl
, struct conn
*trgt_sock_l
)
593 struct data_buf_item
*dbi
= 0;
594 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
595 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
597 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
600 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
601 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
602 trgt_sock_l
->data_buf
.nextread
->datalen
);
603 dbi
= trgt_sock_l
->data_buf
.nextread
;
605 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
607 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
608 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
610 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
611 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
612 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
614 account_bufspace(trgt_sock_l
);
616 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
617 databuf_nextreadchunk(trgt_sock_l
);
619 databuf_item_unlink(trgt_sock_l
, dbi
);
622 void databuf_unpull(struct conn
*trgt_out_l
, __u32 bytes
)
624 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
626 BUG_ON(list_empty(&(trgt_out_l
->data_buf
.items
)) != 0);
628 if (trgt_out_l
->data_buf
.nextread
== 0) {
629 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
631 trgt_out_l
->data_buf
.nextread
= container_of(
632 trgt_out_l
->data_buf
.items
.prev
,
633 struct data_buf_item
, buf_list
);
636 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
637 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
638 trgt_out_l
->data_buf
.nextread
= container_of(
639 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
640 struct data_buf_item
, buf_list
);
641 BUG_ON(&(trgt_out_l
->data_buf
.nextread
->buf_list
) ==
642 &(trgt_out_l
->data_buf
.items
));
643 trgt_out_l
->data_buf
.next_read_offset
=
644 trgt_out_l
->data_buf
.nextread
->datalen
;
647 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
650 void databuf_pullold(struct conn
*trgt_out_l
, __u64 startpos
, char *dst
,
653 __u64 pos
= trgt_out_l
->data_buf
.first_offset
;
654 struct data_buf_item
*dbi
= container_of(
655 trgt_out_l
->data_buf
.items
.next
,
656 struct data_buf_item
, buf_list
);
659 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
661 if (seqno_after(pos
+ dbi
->datalen
, startpos
))
665 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
672 char *srcbufcpystart
= 0;
673 int srcbufcpylen
= 0;
675 __u64 offset
= seqno_clean(startpos
- pos
);
677 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
679 BUG_ON(seqno_before(startpos
, pos
));
680 BUG_ON(offset
> dbi
->datalen
);
682 srcbufcpystart
= dbi
->buf
+ offset
;
683 srcbufcpylen
= dbi
->datalen
- offset
;
685 if (cpy
> srcbufcpylen
)
688 memcpy(dst
, srcbufcpystart
, cpy
);
695 dbi
= container_of(dbi
->buf_list
.next
, struct data_buf_item
,
700 /* ack up to *not* including pos */
701 void databuf_ack(struct conn
*trgt_out_l
, __u64 pos
)
705 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
706 struct data_buf_item
*firstitem
= container_of(
707 trgt_out_l
->data_buf
.items
.next
,
708 struct data_buf_item
, buf_list
);
710 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
713 if (seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
714 firstitem
->datalen
, pos
))
717 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
718 acked
+= firstitem
->datalen
;
720 databuf_item_unlink(trgt_out_l
, firstitem
);
721 databuf_item_free(firstitem
);
724 trgt_out_l
->data_buf
.datasize
-= acked
;
726 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
727 trgt_out_l
->data_buf
.overhead
!= 0);
730 account_bufspace(trgt_out_l
);
733 void databuf_ackread(struct conn
*cn_lx
)
737 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
738 struct data_buf_item
*firstitem
= container_of(
739 cn_lx
->data_buf
.items
.next
,
740 struct data_buf_item
, buf_list
);
742 if (firstitem
== cn_lx
->data_buf
.nextread
)
745 acked
+= firstitem
->datalen
;
747 databuf_item_unlink(cn_lx
, firstitem
);
748 databuf_item_free(firstitem
);
751 cn_lx
->data_buf
.datasize
-= acked
;
752 cn_lx
->data_buf
.first_offset
+= acked
;
754 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
757 account_bufspace(cn_lx
);
760 __u32
receive_buf(struct conn
*cn_lx
, char *buf
, __u32 datalen
,
761 int rcv_delayed_lowbuf
, __u8 flush
)
763 struct data_buf_item
*item
= 0;
767 if (list_empty(&(cn_lx
->data_buf
.items
)) == 0) {
768 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
769 item
= container_of(last
, struct data_buf_item
, buf_list
);
772 while (datalen
> 0) {
775 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
776 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
778 if (item
== 0 || item
->type
!= DATABUF_BUF
||
779 item
->buflen
<= item
->datalen
) {
780 item
= kmem_cache_alloc(data_buf_item_slab
, GFP_ATOMIC
);
781 if (unlikely(item
== 0))
784 memset(item
, 0, sizeof(struct data_buf_item
));
785 item
->type
= DATABUF_BUF
;
787 item
->buflen
= buf_optlen(datalen
);
788 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
790 if (unlikely(item
->buf
== 0)) {
791 kmem_cache_free(data_buf_item_slab
, item
);
796 list_add_tail(&(item
->buf_list
),
797 &(cn_lx
->data_buf
.items
));
799 cn_lx
->data_buf
.overhead
+= item
->buflen
+
800 sizeof(struct data_buf_item
);
803 BUG_ON(item
->type
!= DATABUF_BUF
);
804 BUG_ON(item
->buflen
<= item
->datalen
);
806 if (cn_lx
->data_buf
.nextread
== 0) {
807 cn_lx
->data_buf
.nextread
= item
;
808 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
811 if (item
->buflen
- item
->datalen
< cpy
)
812 cpy
= (item
->buflen
- item
->datalen
);
814 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
815 item
->datalen
+= cpy
;
817 BUG_ON(cpy
> datalen
);
822 cn_lx
->data_buf
.read_remaining
+= cpy
;
823 cn_lx
->data_buf
.datasize
+= cpy
;
824 cn_lx
->data_buf
.overhead
-= cpy
;
825 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
826 cn_lx
->data_buf
.overhead
== 0);
831 cn_lx
->flush
= flush
;
833 account_bufspace(cn_lx
);
834 bufsize_update(cn_lx
, totalcpy
, rcv_delayed_lowbuf
, flush
);
839 __u32
receive_skb(struct conn
*src_in_l
, struct sk_buff
*skb
,
840 int rcv_delayed_lowbuf
, __u8 flush
)
842 struct skb_procstate
*ps
= skb_pstate(skb
);
843 struct data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
845 __u32 bufferleft
= 0;
847 BUG_ON(skb
->len
<= 0);
849 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
850 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
854 if (list_empty(&(src_in_l
->data_buf
.items
)) == 0) {
855 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
856 struct data_buf_item
*item
= container_of(last
,
857 struct data_buf_item
, buf_list
);
858 bufferleft
= item
->buflen
- item
->datalen
;
861 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
862 __u32 rc
= receive_buf(src_in_l
, skb
->data
, skb
->len
,
863 rcv_delayed_lowbuf
, flush
);
864 if (likely(rc
== skb
->len
))
869 memset(item
, 0, sizeof(struct data_buf_item
));
871 item
->type
= DATABUF_SKB
;
872 item
->buf
= skb
->data
;
873 item
->datalen
= skb
->len
;
874 item
->buflen
= item
->datalen
;
875 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
876 if (src_in_l
->data_buf
.nextread
== 0)
877 src_in_l
->data_buf
.nextread
= item
;
879 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
880 src_in_l
->data_buf
.datasize
+= item
->datalen
;
881 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
883 account_bufspace(src_in_l
);
884 bufsize_update(src_in_l
, skb
->len
, rcv_delayed_lowbuf
, flush
);
886 src_in_l
->flush
= flush
;
891 void wake_sender(struct conn
*cn
)
893 spin_lock_bh(&(cn
->rcv_lock
));
895 if (unlikely(cn
->isreset
)) {
896 spin_unlock_bh(&(cn
->rcv_lock
));
900 switch (cn
->sourcetype
) {
901 case SOURCE_UNCONNECTED
:
902 spin_unlock_bh(&(cn
->rcv_lock
));
903 spin_lock_bh(&(cn
->reversedir
->rcv_lock
));
904 if (likely(cn
->reversedir
->isreset
== 0 &&
905 cn
->reversedir
->targettype
==
907 proc_cpacket(cn
->reversedir
);
908 spin_unlock_bh(&(cn
->reversedir
->rcv_lock
));
911 if (cn
->source
.sock
.cs
!= 0 /*&& cor_sock_sndbufavailable(cn)*/)
912 cor_sk_write_space(cn
->source
.sock
.cs
);
913 spin_unlock_bh(&(cn
->rcv_lock
));
917 if (likely(cn
->source
.in
.established
!= 0)) {
918 send_ack_conn_ifneeded(cn
, 0, 0);
920 spin_unlock_bh(&(cn
->rcv_lock
));
927 int __init
forward_init(void)
929 data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
930 sizeof(struct data_buf_item
), 8, 0, 0);
931 if (unlikely(data_buf_item_slab
== 0))
934 atomic64_set(&bufused_sum
, 0);
939 MODULE_LICENSE("GPL");