2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/mutex.h>
20 struct kmem_cache
*cor_data_buf_item_slab
;
22 atomic64_t cor_bufused_sum
;
24 /* __u64 get_bufspace_used(void)
26 return atomic64_read(&cor_bufused_sum);
29 void cor_databuf_init(struct cor_conn
*cn_init
)
31 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
32 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
35 void cor_bufsize_init(struct cor_conn
*cn_l
, __u32 bufsize
)
37 __u32 bufsize_shifted
;
39 memset(&(cn_l
->bufsize
), 0, sizeof(cn_l
->bufsize
));
41 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
42 bufsize_shifted
= U32_MAX
;
44 bufsize_shifted
= bufsize
<< 5;
46 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
47 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
48 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
51 int cor_account_bufspace(struct cor_conn
*cn_lx
)
53 __u64 space_needed
= 0;
55 __u64 bufused_sum_int
;
57 if (likely(cn_lx
->isreset
== 0)) {
58 space_needed
+= cn_lx
->data_buf
.datasize
;
59 space_needed
+= cn_lx
->data_buf
.overhead
;
60 if (cn_lx
->sourcetype
== SOURCE_IN
) {
61 space_needed
+= cn_lx
->src
.in
.reorder_memused
;
65 if (cn_lx
->bufspace_accounted
== space_needed
) {
66 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
68 } else if (atomic64_read(&cor_bufused_sum
) >=
69 BUFUSAGE_GLOBAL_MAX
) {
76 if (unlikely(space_needed
>= U32_MAX
))
79 space_req
= space_needed
;
81 if (cn_lx
->bufspace_accounted
== space_req
)
84 bufused_sum_int
= cor_update_atomic_sum(&cor_bufused_sum
,
85 cn_lx
->bufspace_accounted
, space_req
);
87 cn_lx
->bufspace_accounted
= space_req
;
89 if (cn_lx
->targettype
== TARGET_OUT
&& unlikely(
90 unlikely(cn_lx
->trgt
.out
.in_nb_busy_list
== 0) !=
91 unlikely(cn_lx
->data_buf
.datasize
== 0)))
92 cor_conn_set_last_act(cn_lx
);
94 if (space_needed
!= space_req
)
96 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
98 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
104 int cor_conn_src_unconn_write_allowed(struct cor_conn
*src_unconn_lx
)
106 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
108 if (src_unconn_lx
->data_buf
.datasize
== 0)
110 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_SRC_UNCONN
&&
111 cor_account_bufspace(src_unconn_lx
) == 0)
117 void cor_update_windowlimit(struct cor_conn
*src_in_lx
)
121 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
123 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
125 if (src_in_lx
->targettype
!= TARGET_OUT
) {
126 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
127 cor_account_bufspace(src_in_lx
)) {
128 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
130 } else if (cor_seqno_before_eq(src_in_lx
->trgt
.out
.seqno_windowlimit
,
131 src_in_lx
->trgt
.out
.seqno_nextsend
)) {
132 if (cor_account_bufspace(src_in_lx
)) {
133 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
136 __u32 windowleft
= (__u32
) min((__u64
) U32_MAX
,
138 src_in_lx
->trgt
.out
.seqno_windowlimit
-
139 src_in_lx
->trgt
.out
.seqno_nextsend
));
141 bufsize
= max(bufsize
, min(windowleft
,
142 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
144 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
145 cor_account_bufspace(src_in_lx
)) {
146 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
150 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
151 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
153 /* printk(KERN_ERR "window %p %u %u\n", src_in_lx, bufsize,
154 src_in_lx->data_buf.read_remaining); */
156 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
159 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
161 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
164 src_in_lx
->src
.in
.window_seqnolimit
=
165 src_in_lx
->src
.in
.next_seqno
+ bufsize
;
168 static int cor_bufsize_high_latency_sender(struct cor_conn
*cn_lx
)
170 struct cor_neighbor
*nb
;
174 if (cn_lx
->sourcetype
!= SOURCE_IN
)
177 nb
= cn_lx
->src
.in
.nb
;
178 if (unlikely(nb
== 0))
181 latency_us
= atomic_read(&(nb
->latency_retrans_us
));
182 latency_us
+= atomic_read(&(nb
->latency_stddev_retrans_us
));
183 latency_us
+= CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS
* 1000;
185 return latency_us
> 100000 ? 1 : 0;
188 __u8
_cor_bufsize_update_get_changerate(struct cor_conn
*cn_lx
)
190 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
191 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
195 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
197 } else if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
198 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
201 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
203 if (high_latency_sender
)
205 if (high_latency_conn
)
208 changerate
= 128 - speed
;
209 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
210 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
213 if (high_latency_sender
)
216 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
218 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
221 changerate
= 128 + speed
;
226 /* printk(KERN_ERR "changerate1 %u\n", changerate); */
228 if (cn_lx
->targettype
== TARGET_OUT
) {
229 __u16 remote_changerate
= ((__u16
)
230 cn_lx
->trgt
.out
.remote_bufsize_changerate
) + 64;
231 /* printk(KERN_ERR "changerate2 %u\n", remote_changerate); */
232 changerate
= (changerate
* remote_changerate
) / 128;
233 /* printk(KERN_ERR "changerate3 %u\n", changerate); */
236 if (unlikely(changerate
< 64))
238 else if (unlikely(changerate
- 64 >= 256))
241 return (__u8
) (changerate
- 64);
244 static void _cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
245 int high_latency_sender
, int high_latency_conn
)
247 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
248 BUG_ON(BUFSIZE_SHIFT
!= 5);
251 * If you change the speed here, change it in
252 * _cor_bufsize_update_get_changerate too
255 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
256 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
257 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
261 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
262 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
263 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
266 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
267 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
271 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
274 if (high_latency_sender
)
277 change
= rcvd
* speed
;
278 if (high_latency_conn
)
281 if (cn_lx
->bufsize
.bufsize
< change
)
282 cn_lx
->bufsize
.bufsize
= 0;
284 cn_lx
->bufsize
.bufsize
-= change
;
286 if (cn_lx
->bufsize
.act
.decr
.size_start
/4 >
287 cn_lx
->bufsize
.bufsize
)
288 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
289 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
290 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
294 if (high_latency_sender
)
297 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
299 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
302 change
= rcvd
* speed
;
303 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
304 cn_lx
->bufsize
.bufsize
= U32_MAX
;
306 cn_lx
->bufsize
.bufsize
+= change
;
308 if (cn_lx
->bufsize
.bufsize
>=
309 cn_lx
->bufsize
.act
.incr
.size_end
) {
310 cn_lx
->bufsize
.bufsize
=
311 cn_lx
->bufsize
.act
.incr
.size_end
;
312 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
313 if (high_latency_conn
) {
314 cn_lx
->bufsize
.act
.noact
.bytesleft
=
315 (cn_lx
->bufsize
.bufsize
>>
318 cn_lx
->bufsize
.act
.noact
.bytesleft
=
319 (cn_lx
->bufsize
.bufsize
>>
327 if (unlikely(rcvd
>= (1 << 24)) ||
328 cn_lx
->bufsize
.bytes_rcvd
+ rcvd
>= (1 << 24))
329 cn_lx
->bufsize
.bytes_rcvd
= (1 << 24) - 1;
331 cn_lx
->bufsize
.bytes_rcvd
+= rcvd
;
334 static __u32
cor_get_read_remaining_min(__u32 bufsize_bytes
,
335 int high_latency_sender
, int high_latency_conn
)
337 int bufspace_low
= (atomic64_read(&cor_bufused_sum
) >=
338 3*(BUFUSAGE_GLOBAL_MAX
/4));
340 if (high_latency_conn
) {
341 if (high_latency_sender
) {
343 return bufsize_bytes
/6 + 1;
345 return bufsize_bytes
/3 + 1;
349 return bufsize_bytes
/8 + 1;
351 return bufsize_bytes
/4 + 1;
355 if (high_latency_sender
) {
357 return bufsize_bytes
/6 + 1;
359 return bufsize_bytes
/4 + 1;
363 return bufsize_bytes
/12 + 1;
365 return bufsize_bytes
/8 + 1;
371 static void cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
372 __u8 windowused
, __u8 rcv_flushrcvd
)
374 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
375 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
376 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
377 __u32 read_remaining_min
= cor_get_read_remaining_min(bufsize_bytes
,
378 high_latency_sender
, high_latency_conn
);
379 __u32 read_remaining_min_nofastdecr
= read_remaining_min
*2;
380 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
381 read_remaining_min_nofastdecr
)/2;
382 __u32 read_remaining
;
384 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
385 BUG_ON(windowused
> 31);
387 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
388 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
389 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
391 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
393 read_remaining
= bufsize_bytes
;
395 read_remaining
= max(cn_lx
->data_buf
.read_remaining
- rcvd
,
396 (bufsize_bytes
* (31 - windowused
)) / 31);
399 if (rcv_flushrcvd
!= 0) {
400 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
401 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
402 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
403 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
404 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
405 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
406 cn_lx
->bufsize
.act
.noact
.bytesleft
<
408 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
412 if (read_remaining
< read_remaining_min
) {
413 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
417 if (high_latency_sender
) {
418 if (buf_increase_bytes
< rcvd
/16)
419 buf_increase_bytes
= rcvd
/16;
421 if (buf_increase_bytes
< rcvd
/32)
422 buf_increase_bytes
= rcvd
/32;
425 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
426 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
428 buf_increase
= U32_MAX
;
430 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
431 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
432 bufsize_end
= U32_MAX
;
434 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
435 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
436 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
437 cn_lx
->bufsize
.act
.incr
.size_start
=
438 cn_lx
->bufsize
.bufsize
;
439 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
442 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
443 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
444 if (bufsize_end
/4 > cn_lx
->bufsize
.act
.incr
.size_start
)
445 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
446 } else if (read_remaining
< read_remaining_min_nodecr
) {
447 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
448 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
449 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
452 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
453 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
454 if (high_latency_conn
)
456 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
457 rtt_mul
> bytesleft
))
458 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
459 BUFSIZE_SHIFT
) * rtt_mul
;
461 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
462 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
464 } else if (read_remaining
< read_remaining_min_nofastdecr
) {
465 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
466 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
467 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
468 cn_lx
->bufsize
.act
.decr
.size_start
=
469 cn_lx
->bufsize
.bufsize
;
473 if (cn_lx
->targettype
== TARGET_OUT
) {
474 __u16 rem_changerate
= ((__u16
)
475 cn_lx
->trgt
.out
.remote_bufsize_changerate
) + 64;
476 __u16 crate_nofastdecr
;
478 __u16 crate_nofastincr
;
481 if (high_latency_conn
) {
482 crate_nofastdecr
= 128 - 128/8;
483 crate_nodecr
= 128 - 128/6;
484 crate_nofastincr
= 128 + 128/4;
485 crate_noincr
= 128 + 128/3;
487 crate_nofastdecr
= 128 - 128/16;
488 crate_nodecr
= 128 - 128/12;
489 crate_nofastincr
= 128 + 128/8;
490 crate_noincr
= 128 + 128/6;
493 if ((rem_changerate
< crate_nodecr
||
494 rem_changerate
> crate_noincr
) &&
495 cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
496 cn_lx
->bufsize
.act
.noact
.bytesleft
= max(
497 cn_lx
->bufsize
.act
.noact
.bytesleft
,
498 cn_lx
->bufsize
.bufsize
>>
502 if (rem_changerate
< crate_nodecr
&& (
503 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
504 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)) {
505 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
506 cn_lx
->bufsize
.act
.noact
.bytesleft
=
507 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
509 if (rem_changerate
< crate_nofastdecr
&&
510 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
511 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
512 cn_lx
->bufsize
.act
.decr
.size_start
=
513 cn_lx
->bufsize
.bufsize
;
516 if (rem_changerate
> crate_noincr
&& (
517 cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
518 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)) {
519 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
520 cn_lx
->bufsize
.act
.noact
.bytesleft
=
521 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
523 if (rem_changerate
> crate_nofastincr
&&
524 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
525 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
526 cn_lx
->bufsize
.act
.incr
.size_start
=
527 cn_lx
->bufsize
.bufsize
;
531 _cor_bufsize_update(cn_lx
, rcvd
, high_latency_sender
,
535 void cor_bufsize_read_to_sock(struct cor_conn
*trgt_sock_lx
)
537 unsigned long jiffies_tmp
= jiffies
;
538 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
541 if (trgt_sock_lx
->trgt
.sock
.waiting_for_userspace
!= 0 && time_before(
542 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace_since
,
543 jiffies
- latency_limit
)) {
544 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
545 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
548 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
549 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace
= 0;
551 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace
= 1;
552 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace_since
=
557 static inline void cor_databuf_item_unlink(struct cor_conn
*cn_lx
,
558 struct cor_data_buf_item
*item
)
560 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
561 list_del(&(item
->buf_list
));
562 if (item
->type
== DATABUF_BUF
) {
563 cn_lx
->data_buf
.overhead
-= sizeof(struct cor_data_buf_item
) +
564 item
->buflen
- item
->datalen
;
565 } else if (item
->type
== DATABUF_SKB
) {
566 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
572 void cor_databuf_ackdiscard(struct cor_conn
*cn_lx
)
576 cn_lx
->data_buf
.next_read_offset
= 0;
577 cn_lx
->data_buf
.nextread
= 0;
579 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
580 struct cor_data_buf_item
*item
= container_of(
581 cn_lx
->data_buf
.items
.next
,
582 struct cor_data_buf_item
, buf_list
);
583 freed
+= item
->datalen
;
585 cor_databuf_item_unlink(cn_lx
, item
);
586 cor_databuf_item_free(item
);
589 cn_lx
->data_buf
.datasize
-= freed
;
590 cn_lx
->data_buf
.first_offset
+= freed
;
592 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
593 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
595 cn_lx
->data_buf
.read_remaining
= 0;
598 void cor_reset_seqno(struct cor_conn
*cn_l
, __u64 initseqno
)
600 cn_l
->data_buf
.first_offset
= initseqno
-
601 cn_l
->data_buf
.datasize
+
602 cn_l
->data_buf
.read_remaining
;
605 static void cor_databuf_nextreadchunk(struct cor_conn
*cn_lx
)
607 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
608 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
609 cn_lx
->data_buf
.nextread
->datalen
);
611 if (&(cn_lx
->data_buf
.nextread
->buf_list
) ==
612 cn_lx
->data_buf
.items
.prev
) {
613 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
614 cn_lx
->data_buf
.nextread
= 0;
617 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
618 cn_lx
->data_buf
.nextread
= container_of(
619 cn_lx
->data_buf
.nextread
->buf_list
.next
,
620 struct cor_data_buf_item
, buf_list
);
623 cn_lx
->data_buf
.next_read_offset
= 0;
626 void cor_databuf_pull(struct cor_conn
*cn_lx
, char *dst
, __u32 len
)
628 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
633 char *srcbufcpystart
= 0;
634 int srcbufcpylen
= 0;
636 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
637 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
638 cn_lx
->data_buf
.nextread
->datalen
);
640 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
641 cn_lx
->data_buf
.next_read_offset
;
642 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
643 cn_lx
->data_buf
.next_read_offset
;
645 if (cpy
> srcbufcpylen
)
648 memcpy(dst
, srcbufcpystart
, cpy
);
653 cn_lx
->data_buf
.read_remaining
-= cpy
;
654 cn_lx
->data_buf
.next_read_offset
+= cpy
;
656 if (cpy
== srcbufcpylen
)
657 cor_databuf_nextreadchunk(cn_lx
);
661 void cor_databuf_unpull_dpi(struct cor_conn
*trgt_sock
, struct cor_sock
*cs
,
662 struct cor_data_buf_item
*item
, __u16 next_read_offset
)
664 BUG_ON(next_read_offset
> item
->datalen
);
666 if (next_read_offset
>= item
->datalen
)
669 spin_lock_bh(&(trgt_sock
->rcv_lock
));
671 if (unlikely(cor_is_trgt_sock(trgt_sock
, cs
) == 0)) {
672 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
676 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
677 &(trgt_sock
->data_buf
.nextread
->buf_list
) !=
678 trgt_sock
->data_buf
.items
.next
);
679 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
681 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
682 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
683 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
685 if (item
->type
== DATABUF_BUF
) {
686 trgt_sock
->data_buf
.overhead
+=
687 sizeof(struct cor_data_buf_item
) +
688 item
->buflen
- item
->datalen
;
689 } else if (item
->type
== DATABUF_SKB
) {
690 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
695 list_add(&(item
->buf_list
), &(trgt_sock
->data_buf
.items
));
697 trgt_sock
->data_buf
.nextread
= item
;
698 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
700 cor_account_bufspace(trgt_sock
);
702 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
706 cor_databuf_item_free(item
);
710 void cor_databuf_pull_dbi(struct cor_sock
*cs_rl
, struct cor_conn
*trgt_sock_l
)
712 struct cor_data_buf_item
*dbi
= 0;
713 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
714 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
716 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
719 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
720 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
721 trgt_sock_l
->data_buf
.nextread
->datalen
);
722 dbi
= trgt_sock_l
->data_buf
.nextread
;
724 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
726 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
727 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
729 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
730 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
731 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
733 cor_account_bufspace(trgt_sock_l
);
735 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
736 cor_databuf_nextreadchunk(trgt_sock_l
);
738 cor_databuf_item_unlink(trgt_sock_l
, dbi
);
741 void cor_databuf_unpull(struct cor_conn
*trgt_out_l
, __u32 bytes
)
743 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
745 BUG_ON(list_empty(&(trgt_out_l
->data_buf
.items
)) != 0);
747 if (trgt_out_l
->data_buf
.nextread
== 0) {
748 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
750 trgt_out_l
->data_buf
.nextread
= container_of(
751 trgt_out_l
->data_buf
.items
.prev
,
752 struct cor_data_buf_item
, buf_list
);
755 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
756 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
757 trgt_out_l
->data_buf
.nextread
= container_of(
758 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
759 struct cor_data_buf_item
, buf_list
);
760 BUG_ON(&(trgt_out_l
->data_buf
.nextread
->buf_list
) ==
761 &(trgt_out_l
->data_buf
.items
));
762 trgt_out_l
->data_buf
.next_read_offset
=
763 trgt_out_l
->data_buf
.nextread
->datalen
;
766 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
769 void cor_databuf_pullold(struct cor_conn
*trgt_out_l
, __u64 startpos
, char *dst
,
772 __u64 pos
= trgt_out_l
->data_buf
.first_offset
;
773 struct cor_data_buf_item
*dbi
= container_of(
774 trgt_out_l
->data_buf
.items
.next
,
775 struct cor_data_buf_item
, buf_list
);
778 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
780 if (cor_seqno_after(pos
+ dbi
->datalen
, startpos
))
784 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
791 char *srcbufcpystart
= 0;
792 int srcbufcpylen
= 0;
794 __u64 offset
= cor_seqno_clean(startpos
- pos
);
796 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
798 BUG_ON(cor_seqno_before(startpos
, pos
));
799 BUG_ON(offset
> dbi
->datalen
);
801 srcbufcpystart
= dbi
->buf
+ offset
;
802 srcbufcpylen
= dbi
->datalen
- offset
;
804 if (cpy
> srcbufcpylen
)
807 memcpy(dst
, srcbufcpystart
, cpy
);
814 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
819 /* ack up to *not* including pos */
820 void cor_databuf_ack(struct cor_conn
*trgt_out_l
, __u64 pos
)
824 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
825 struct cor_data_buf_item
*firstitem
= container_of(
826 trgt_out_l
->data_buf
.items
.next
,
827 struct cor_data_buf_item
, buf_list
);
829 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
832 if (cor_seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
833 firstitem
->datalen
, pos
))
836 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
837 acked
+= firstitem
->datalen
;
839 cor_databuf_item_unlink(trgt_out_l
, firstitem
);
840 cor_databuf_item_free(firstitem
);
843 trgt_out_l
->data_buf
.datasize
-= acked
;
845 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
846 trgt_out_l
->data_buf
.overhead
!= 0);
848 if (unlikely(trgt_out_l
->trgt
.out
.nblist_busy_remaining
<= acked
)) {
849 trgt_out_l
->trgt
.out
.nblist_busy_remaining
= 0;
850 cor_conn_set_last_act(trgt_out_l
);
852 trgt_out_l
->trgt
.out
.nblist_busy_remaining
-= acked
;
856 cor_account_bufspace(trgt_out_l
);
859 void cor_databuf_ackread(struct cor_conn
*cn_lx
)
863 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
864 struct cor_data_buf_item
*firstitem
= container_of(
865 cn_lx
->data_buf
.items
.next
,
866 struct cor_data_buf_item
, buf_list
);
868 if (firstitem
== cn_lx
->data_buf
.nextread
)
871 acked
+= firstitem
->datalen
;
873 cor_databuf_item_unlink(cn_lx
, firstitem
);
874 cor_databuf_item_free(firstitem
);
877 cn_lx
->data_buf
.datasize
-= acked
;
878 cn_lx
->data_buf
.first_offset
+= acked
;
880 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
882 if (cn_lx
->targettype
== TARGET_OUT
) {
883 if (unlikely(cn_lx
->trgt
.out
.nblist_busy_remaining
<=
885 cn_lx
->trgt
.out
.nblist_busy_remaining
= 0;
886 cor_conn_set_last_act(cn_lx
);
888 cn_lx
->trgt
.out
.nblist_busy_remaining
-= acked
;
893 cor_account_bufspace(cn_lx
);
896 __u32
_cor_receive_buf(struct cor_conn
*cn_lx
, char *buf
, __u32 datalen
,
897 int from_sock
, __u8 windowused
, __u8 flush
)
899 struct cor_data_buf_item
*item
= 0;
903 if (list_empty(&(cn_lx
->data_buf
.items
)) == 0) {
904 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
905 item
= container_of(last
, struct cor_data_buf_item
, buf_list
);
908 while (datalen
> 0) {
911 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
912 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
914 if (item
== 0 || item
->type
!= DATABUF_BUF
||
915 item
->buflen
<= item
->datalen
) {
916 item
= kmem_cache_alloc(cor_data_buf_item_slab
,
918 if (unlikely(item
== 0))
921 memset(item
, 0, sizeof(struct cor_data_buf_item
));
922 item
->type
= DATABUF_BUF
;
924 item
->buflen
= cor_buf_optlen(datalen
, from_sock
);
925 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
927 if (unlikely(item
->buf
== 0)) {
928 kmem_cache_free(cor_data_buf_item_slab
, item
);
933 list_add_tail(&(item
->buf_list
),
934 &(cn_lx
->data_buf
.items
));
936 cn_lx
->data_buf
.overhead
+= item
->buflen
+
937 sizeof(struct cor_data_buf_item
);
940 BUG_ON(item
->type
!= DATABUF_BUF
);
941 BUG_ON(item
->buflen
<= item
->datalen
);
943 if (cn_lx
->data_buf
.nextread
== 0) {
944 cn_lx
->data_buf
.nextread
= item
;
945 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
948 if (item
->buflen
- item
->datalen
< cpy
)
949 cpy
= (item
->buflen
- item
->datalen
);
951 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
952 item
->datalen
+= cpy
;
954 BUG_ON(cpy
> datalen
);
959 cn_lx
->data_buf
.read_remaining
+= cpy
;
960 cn_lx
->data_buf
.datasize
+= cpy
;
961 cn_lx
->data_buf
.overhead
-= cpy
;
962 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
963 cn_lx
->data_buf
.overhead
== 0);
968 cn_lx
->flush
= flush
;
970 cor_account_bufspace(cn_lx
);
971 cor_bufsize_update(cn_lx
, totalcpy
, windowused
, flush
);
976 __u32
cor_receive_skb(struct cor_conn
*src_in_l
, struct sk_buff
*skb
,
977 __u8 windowused
, __u8 flush
)
979 struct cor_skb_procstate
*ps
= cor_skb_pstate(skb
);
980 struct cor_data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
982 __u32 bufferleft
= 0;
984 BUG_ON(skb
->len
<= 0);
986 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
987 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
991 if (list_empty(&(src_in_l
->data_buf
.items
)) == 0) {
992 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
993 struct cor_data_buf_item
*item
= container_of(last
,
994 struct cor_data_buf_item
, buf_list
);
995 bufferleft
= item
->buflen
- item
->datalen
;
998 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
999 __u32 rc
= cor_receive_buf(src_in_l
, skb
->data
, skb
->len
,
1001 if (likely(rc
== skb
->len
))
1006 memset(item
, 0, sizeof(struct cor_data_buf_item
));
1008 item
->type
= DATABUF_SKB
;
1009 item
->buf
= skb
->data
;
1010 item
->datalen
= skb
->len
;
1011 item
->buflen
= item
->datalen
;
1012 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
1013 if (src_in_l
->data_buf
.nextread
== 0)
1014 src_in_l
->data_buf
.nextread
= item
;
1016 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
1017 src_in_l
->data_buf
.datasize
+= item
->datalen
;
1018 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
1020 cor_account_bufspace(src_in_l
);
1021 cor_bufsize_update(src_in_l
, skb
->len
, windowused
, flush
);
1023 src_in_l
->flush
= flush
;
1028 void cor_wake_sender(struct cor_conn
*cn
)
1030 spin_lock_bh(&(cn
->rcv_lock
));
1032 if (unlikely(cn
->isreset
)) {
1033 spin_unlock_bh(&(cn
->rcv_lock
));
1037 switch (cn
->sourcetype
) {
1038 case SOURCE_UNCONNECTED
:
1039 spin_unlock_bh(&(cn
->rcv_lock
));
1040 spin_lock_bh(&(cor_get_conn_reversedir(cn
)->rcv_lock
));
1041 if (likely(cor_get_conn_reversedir(cn
)->isreset
== 0 &&
1042 cor_get_conn_reversedir(cn
)->targettype
==
1043 TARGET_UNCONNECTED
))
1044 cor_proc_cpacket(cor_get_conn_reversedir(cn
));
1045 spin_unlock_bh(&(cor_get_conn_reversedir(cn
)->rcv_lock
));
1048 if (_cor_mngdsocket_flushtoconn(cn
) == RC_FTC_OK
&&
1049 cn
->src
.sock
.ed
->cs
!= 0 &&
1050 cor_sock_sndbufavailable(cn
, 1))
1051 cor_sk_write_space(cn
->src
.sock
.ed
->cs
);
1052 spin_unlock_bh(&(cn
->rcv_lock
));
1055 cor_drain_ooo_queue(cn
);
1056 if (likely(cn
->src
.in
.established
!= 0)) {
1057 cor_send_ack_conn_ifneeded(cn
, 0, 0);
1059 spin_unlock_bh(&(cn
->rcv_lock
));
1066 int __init
cor_forward_init(void)
1068 cor_data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
1069 sizeof(struct cor_data_buf_item
), 8, 0, 0);
1070 if (unlikely(cor_data_buf_item_slab
== 0))
1073 atomic64_set(&cor_bufused_sum
, 0);
1078 void __exit
cor_forward_exit2(void)
1080 BUG_ON(atomic64_read(&cor_bufused_sum
) != 0);
1082 kmem_cache_destroy(cor_data_buf_item_slab
);
1083 cor_data_buf_item_slab
= 0;
1086 MODULE_LICENSE("GPL");