2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/mutex.h>
20 struct kmem_cache
*cor_data_buf_item_slab
;
22 atomic64_t cor_bufused_sum
;
24 /* __u64 get_bufspace_used(void)
26 return atomic64_read(&cor_bufused_sum);
29 void cor_databuf_init(struct cor_conn
*cn_init
)
31 memset(&cn_init
->data_buf
, 0, sizeof(cn_init
->data_buf
));
32 INIT_LIST_HEAD(&cn_init
->data_buf
.items
);
35 void cor_bufsize_init(struct cor_conn
*cn_l
, __u32 bufsize
)
37 __u32 bufsize_shifted
;
39 memset(&cn_l
->bufsize
, 0, sizeof(cn_l
->bufsize
));
41 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
42 bufsize_shifted
= U32_MAX
;
44 bufsize_shifted
= bufsize
<< 5;
46 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
47 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
48 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
51 int cor_account_bufspace(struct cor_conn
*cn_lx
)
53 __u64 space_needed
= 0;
55 __u64 bufused_sum_int
;
57 if (likely(cn_lx
->isreset
== 0)) {
58 space_needed
+= cn_lx
->data_buf
.datasize
;
59 space_needed
+= cn_lx
->data_buf
.overhead
;
60 if (cn_lx
->sourcetype
== SOURCE_IN
) {
61 space_needed
+= cn_lx
->src
.in
.reorder_memused
;
65 if (cn_lx
->bufspace_accounted
== space_needed
) {
66 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
68 } else if (atomic64_read(&cor_bufused_sum
) >=
69 BUFUSAGE_GLOBAL_MAX
) {
76 if (unlikely(space_needed
>= U32_MAX
))
79 space_req
= space_needed
;
81 if (cn_lx
->bufspace_accounted
== space_req
)
84 bufused_sum_int
= cor_update_atomic_sum(&cor_bufused_sum
,
85 cn_lx
->bufspace_accounted
, space_req
);
87 cn_lx
->bufspace_accounted
= space_req
;
89 if (cn_lx
->targettype
== TARGET_OUT
&& unlikely(
90 (cn_lx
->trgt
.out
.in_nb_busy_list
== 0) !=
91 (cn_lx
->data_buf
.datasize
== 0)))
92 cor_conn_set_last_act(cn_lx
);
94 if (space_needed
!= space_req
)
96 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
98 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
104 int cor_conn_src_unconn_write_allowed(struct cor_conn
*src_unconn_lx
)
106 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
108 if (src_unconn_lx
->data_buf
.datasize
== 0)
110 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_SRC_UNCONN
&&
111 cor_account_bufspace(src_unconn_lx
) == 0)
117 void cor_update_windowlimit(struct cor_conn
*src_in_lx
)
121 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
123 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
125 if (src_in_lx
->targettype
!= TARGET_OUT
) {
126 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
127 cor_account_bufspace(src_in_lx
)) {
128 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
130 } else if (cor_seqno_before_eq(src_in_lx
->trgt
.out
.seqno_windowlimit
,
131 src_in_lx
->trgt
.out
.seqno_nextsend
)) {
132 if (cor_account_bufspace(src_in_lx
)) {
133 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
136 __u32 windowleft
= src_in_lx
->trgt
.out
.seqno_windowlimit
-
137 src_in_lx
->trgt
.out
.seqno_nextsend
;
139 bufsize
= max(bufsize
, min(windowleft
,
140 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
142 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
143 cor_account_bufspace(src_in_lx
)) {
144 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
148 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
149 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
151 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
154 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
156 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
159 src_in_lx
->src
.in
.window_seqnolimit
=
160 src_in_lx
->src
.in
.next_seqno
+ bufsize
;
163 static int cor_bufsize_high_latency_sender(struct cor_conn
*cn_lx
)
165 struct cor_neighbor
*nb
;
169 if (cn_lx
->sourcetype
!= SOURCE_IN
)
172 nb
= cn_lx
->src
.in
.nb
;
173 if (unlikely(nb
== 0))
176 latency_us
= atomic_read(&nb
->latency_retrans_us
);
177 latency_us
+= atomic_read(&nb
->latency_stddev_retrans_us
);
178 latency_us
+= CMSG_MAXDELAY_ACKCONN_MS
* 1000;
180 return latency_us
> 100000 ? 1 : 0;
183 __u8
_cor_bufsize_update_get_changerate(struct cor_conn
*cn_lx
)
185 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
186 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
190 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
192 } else if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
193 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
196 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
198 if (high_latency_sender
)
200 if (high_latency_conn
)
203 changerate
= 128 - speed
;
204 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
205 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
208 if (high_latency_sender
)
211 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
213 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
216 changerate
= 128 + speed
;
221 /* printk(KERN_ERR "changerate1 %u\n", changerate); */
223 if (cn_lx
->targettype
== TARGET_OUT
) {
224 __u16 remote_changerate
= ((__u16
)
225 cn_lx
->trgt
.out
.remote_bufsize_changerate
) + 64;
226 /* printk(KERN_ERR "changerate2 %u\n", remote_changerate); */
227 changerate
= (changerate
* remote_changerate
) / 128;
228 /* printk(KERN_ERR "changerate3 %u\n", changerate); */
231 if (unlikely(changerate
< 64))
233 else if (unlikely(changerate
- 64 >= 256))
236 return (__u8
) (changerate
- 64);
239 static void _cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
240 int high_latency_sender
, int high_latency_conn
)
242 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
243 BUILD_BUG_ON(BUFSIZE_SHIFT
!= 5);
246 * If you change the speed here, change it in
247 * _cor_bufsize_update_get_changerate too
250 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
251 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
252 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
256 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
257 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
258 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
261 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
262 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
266 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
269 if (high_latency_sender
)
272 change
= rcvd
* speed
;
273 if (high_latency_conn
)
276 if (cn_lx
->bufsize
.bufsize
< change
)
277 cn_lx
->bufsize
.bufsize
= 0;
279 cn_lx
->bufsize
.bufsize
-= change
;
281 if (cn_lx
->bufsize
.act
.decr
.size_start
/ 4 >
282 cn_lx
->bufsize
.bufsize
)
283 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
284 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
285 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
289 if (high_latency_sender
)
292 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
294 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
297 change
= rcvd
* speed
;
298 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
299 cn_lx
->bufsize
.bufsize
= U32_MAX
;
301 cn_lx
->bufsize
.bufsize
+= change
;
303 if (cn_lx
->bufsize
.bufsize
>=
304 cn_lx
->bufsize
.act
.incr
.size_end
) {
305 cn_lx
->bufsize
.bufsize
=
306 cn_lx
->bufsize
.act
.incr
.size_end
;
307 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
308 BUILD_BUG_ON(BUFSIZE_SHIFT
< 3);
309 if (high_latency_conn
) {
310 cn_lx
->bufsize
.act
.noact
.bytesleft
=
311 (cn_lx
->bufsize
.bufsize
>>
312 (BUFSIZE_SHIFT
- 3));
314 cn_lx
->bufsize
.act
.noact
.bytesleft
=
315 (cn_lx
->bufsize
.bufsize
>>
316 (BUFSIZE_SHIFT
- 2));
323 if (unlikely(rcvd
>= (1 << 24)) ||
324 cn_lx
->bufsize
.bytes_rcvd
+ rcvd
>= (1 << 24))
325 cn_lx
->bufsize
.bytes_rcvd
= (1 << 24) - 1;
327 cn_lx
->bufsize
.bytes_rcvd
+= rcvd
;
330 static __u32
cor_get_read_remaining_min(__u32 bufsize_bytes
,
331 int high_latency_sender
, int high_latency_conn
)
333 int bufspace_low
= (atomic64_read(&cor_bufused_sum
) >=
334 3 * (BUFUSAGE_GLOBAL_MAX
/ 4));
336 if (high_latency_conn
) {
337 if (high_latency_sender
) {
339 return bufsize_bytes
/ 6 + 1;
341 return bufsize_bytes
/ 3 + 1;
345 return bufsize_bytes
/ 8 + 1;
347 return bufsize_bytes
/ 4 + 1;
351 if (high_latency_sender
) {
353 return bufsize_bytes
/ 6 + 1;
355 return bufsize_bytes
/ 4 + 1;
359 return bufsize_bytes
/ 12 + 1;
361 return bufsize_bytes
/ 8 + 1;
367 static void cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
368 __u8 windowused
, __u8 rcv_flushrcvd
)
370 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
371 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
372 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
373 __u32 read_remaining_min
= cor_get_read_remaining_min(bufsize_bytes
,
374 high_latency_sender
, high_latency_conn
);
375 __u32 read_remaining_min_nofastdecr
= read_remaining_min
* 2;
376 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
377 read_remaining_min_nofastdecr
) / 2;
378 __u32 read_remaining
;
380 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
381 BUG_ON(windowused
> 31);
383 /* if (cn_lx->is_highlatency == 0)
384 printk(KERN_ERR "bufsize %p %u %u %u %u %u %u\n",
385 cn_lx, bufsize_bytes,
386 cn_lx->data_buf.read_remaining, rcvd,
387 windowused, rcv_flushrcvd,
388 cn_lx->bufsize.ignore_rcv_lowbuf); */
390 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
391 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
392 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
394 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
396 read_remaining
= bufsize_bytes
;
398 read_remaining
= max(cn_lx
->data_buf
.read_remaining
- rcvd
,
399 (bufsize_bytes
* (31 - windowused
)) / 31);
402 if (rcv_flushrcvd
!= 0) {
403 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
405 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
406 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
407 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
408 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
409 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
410 cn_lx
->bufsize
.act
.noact
.bytesleft
<
412 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
416 if (read_remaining
< read_remaining_min
) {
417 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
421 if (high_latency_sender
) {
422 if (buf_increase_bytes
< rcvd
/ 16)
423 buf_increase_bytes
= rcvd
/ 16;
425 if (buf_increase_bytes
< rcvd
/ 32)
426 buf_increase_bytes
= rcvd
/ 32;
429 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
430 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
432 buf_increase
= U32_MAX
;
434 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
435 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
436 bufsize_end
= U32_MAX
;
438 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
439 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
440 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
441 cn_lx
->bufsize
.act
.incr
.size_start
=
442 cn_lx
->bufsize
.bufsize
;
443 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
446 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
447 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
448 if (bufsize_end
/ 4 > cn_lx
->bufsize
.act
.incr
.size_start
)
449 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
450 } else if (read_remaining
< read_remaining_min_nodecr
) {
451 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
452 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
453 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
457 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
458 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
459 if (high_latency_conn
)
461 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
462 rtt_mul
> bytesleft
))
463 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
464 BUFSIZE_SHIFT
) * rtt_mul
;
466 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
467 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
469 } else if (read_remaining
< read_remaining_min_nofastdecr
) {
470 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
471 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
472 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
473 cn_lx
->bufsize
.act
.decr
.size_start
=
474 cn_lx
->bufsize
.bufsize
;
478 if (cn_lx
->targettype
== TARGET_OUT
) {
479 __u16 rem_changerate
= ((__u16
)
480 cn_lx
->trgt
.out
.remote_bufsize_changerate
) + 64;
481 __u16 crate_nofastdecr
;
483 __u16 crate_nofastincr
;
486 if (high_latency_conn
) {
487 crate_nofastdecr
= 128 - 128 / 8;
488 crate_nodecr
= 128 - 128 / 6;
489 crate_nofastincr
= 128 + 128 / 4;
490 crate_noincr
= 128 + 128 / 3;
492 crate_nofastdecr
= 128 - 128 / 16;
493 crate_nodecr
= 128 - 128 / 12;
494 crate_nofastincr
= 128 + 128 / 8;
495 crate_noincr
= 128 + 128 / 6;
498 if ((rem_changerate
< crate_nodecr
||
499 rem_changerate
> crate_noincr
) &&
500 cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
501 cn_lx
->bufsize
.act
.noact
.bytesleft
= max(
502 cn_lx
->bufsize
.act
.noact
.bytesleft
,
503 cn_lx
->bufsize
.bufsize
>>
507 if (rem_changerate
< crate_nodecr
&& (
508 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
509 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)) {
510 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
511 cn_lx
->bufsize
.act
.noact
.bytesleft
=
512 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
514 if (rem_changerate
< crate_nofastdecr
&&
515 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
516 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
517 cn_lx
->bufsize
.act
.decr
.size_start
=
518 cn_lx
->bufsize
.bufsize
;
521 if (rem_changerate
> crate_noincr
&& (
522 cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
523 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)) {
524 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
525 cn_lx
->bufsize
.act
.noact
.bytesleft
=
526 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
528 if (rem_changerate
> crate_nofastincr
&&
529 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
530 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
531 cn_lx
->bufsize
.act
.incr
.size_start
=
532 cn_lx
->bufsize
.bufsize
;
536 _cor_bufsize_update(cn_lx
, rcvd
, high_latency_sender
,
540 void cor_bufsize_read_to_sock(struct cor_conn
*trgt_sock_lx
)
542 unsigned long jiffies_tmp
= jiffies
;
543 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
547 * High cpu usage may cause high latency of the userspace receiver.
548 * Increasing bufferspace to compensate may increase latency further.
550 if (trgt_sock_lx
->trgt
.sock
.waiting_for_userspace
!= 0 && time_before(
551 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace_since
,
552 jiffies
- latency_limit
)) {
553 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
554 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
557 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
558 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace
= 0;
560 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace
= 1;
561 trgt_sock_lx
->trgt
.sock
.waiting_for_userspace_since
=
566 static inline void cor_databuf_item_unlink(struct cor_conn
*cn_lx
,
567 struct cor_data_buf_item
*item
)
569 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
570 list_del(&item
->buf_list
);
571 if (item
->type
== DATABUF_BUF
) {
572 cn_lx
->data_buf
.overhead
-= sizeof(struct cor_data_buf_item
) +
573 item
->buflen
- item
->datalen
;
574 } else if (item
->type
== DATABUF_SKB
) {
575 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
581 void cor_databuf_ackdiscard(struct cor_conn
*cn_lx
)
585 cn_lx
->data_buf
.next_read_offset
= 0;
586 cn_lx
->data_buf
.nextread
= 0;
588 while (!list_empty(&cn_lx
->data_buf
.items
)) {
589 struct cor_data_buf_item
*item
= container_of(
590 cn_lx
->data_buf
.items
.next
,
591 struct cor_data_buf_item
, buf_list
);
592 freed
+= item
->datalen
;
594 cor_databuf_item_unlink(cn_lx
, item
);
595 cor_databuf_item_free(item
);
598 cn_lx
->data_buf
.datasize
-= freed
;
599 cn_lx
->data_buf
.first_offset
+= freed
;
601 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
602 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
604 cn_lx
->data_buf
.read_remaining
= 0;
607 void cor_reset_seqno(struct cor_conn
*cn_l
, __u32 initseqno
)
609 cn_l
->data_buf
.first_offset
= initseqno
-
610 cn_l
->data_buf
.datasize
+
611 cn_l
->data_buf
.read_remaining
;
614 static void cor_databuf_nextreadchunk(struct cor_conn
*cn_lx
)
616 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
617 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
618 cn_lx
->data_buf
.nextread
->datalen
);
620 if (&cn_lx
->data_buf
.nextread
->buf_list
== cn_lx
->data_buf
.items
.prev
) {
621 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
622 cn_lx
->data_buf
.nextread
= 0;
625 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
626 cn_lx
->data_buf
.nextread
= container_of(
627 cn_lx
->data_buf
.nextread
->buf_list
.next
,
628 struct cor_data_buf_item
, buf_list
);
631 cn_lx
->data_buf
.next_read_offset
= 0;
634 void cor_databuf_pull(struct cor_conn
*cn_lx
, char *dst
, __u32 len
)
636 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
641 char *srcbufcpystart
= 0;
642 int srcbufcpylen
= 0;
644 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
645 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
646 cn_lx
->data_buf
.nextread
->datalen
);
648 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
649 cn_lx
->data_buf
.next_read_offset
;
650 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
651 cn_lx
->data_buf
.next_read_offset
;
653 if (cpy
> srcbufcpylen
)
656 memcpy(dst
, srcbufcpystart
, cpy
);
661 cn_lx
->data_buf
.read_remaining
-= cpy
;
662 cn_lx
->data_buf
.next_read_offset
+= cpy
;
664 if (cpy
== srcbufcpylen
)
665 cor_databuf_nextreadchunk(cn_lx
);
669 void cor_databuf_unpull_dpi(struct cor_conn
*trgt_sock
, struct cor_sock
*cs
,
670 struct cor_data_buf_item
*item
, __u16 next_read_offset
)
672 BUG_ON(next_read_offset
> item
->datalen
);
674 if (next_read_offset
>= item
->datalen
)
677 spin_lock_bh(&trgt_sock
->rcv_lock
);
679 if (unlikely(cor_is_trgt_sock(trgt_sock
, cs
) == 0)) {
680 spin_unlock_bh(&trgt_sock
->rcv_lock
);
684 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
685 &trgt_sock
->data_buf
.nextread
->buf_list
!=
686 trgt_sock
->data_buf
.items
.next
);
687 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
689 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
690 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
691 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
693 if (item
->type
== DATABUF_BUF
) {
694 trgt_sock
->data_buf
.overhead
+=
695 sizeof(struct cor_data_buf_item
) +
696 item
->buflen
- item
->datalen
;
697 } else if (item
->type
== DATABUF_SKB
) {
698 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
703 list_add(&item
->buf_list
, &trgt_sock
->data_buf
.items
);
705 trgt_sock
->data_buf
.nextread
= item
;
706 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
708 cor_account_bufspace(trgt_sock
);
710 spin_unlock_bh(&trgt_sock
->rcv_lock
);
714 cor_databuf_item_free(item
);
718 void cor_databuf_pull_dbi(struct cor_sock
*cs_rl
, struct cor_conn
*trgt_sock_l
)
720 struct cor_data_buf_item
*dbi
= 0;
721 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
722 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
724 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
727 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
728 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
729 trgt_sock_l
->data_buf
.nextread
->datalen
);
730 dbi
= trgt_sock_l
->data_buf
.nextread
;
732 BUG_ON(&dbi
->buf_list
!= trgt_sock_l
->data_buf
.items
.next
);
734 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
735 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
737 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
738 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
739 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
741 cor_account_bufspace(trgt_sock_l
);
743 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
744 cor_databuf_nextreadchunk(trgt_sock_l
);
746 cor_databuf_item_unlink(trgt_sock_l
, dbi
);
749 void cor_databuf_unpull(struct cor_conn
*trgt_out_l
, __u32 bytes
)
751 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
753 BUG_ON(list_empty(&trgt_out_l
->data_buf
.items
) != 0);
755 if (trgt_out_l
->data_buf
.nextread
== 0) {
756 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
758 trgt_out_l
->data_buf
.nextread
= container_of(
759 trgt_out_l
->data_buf
.items
.prev
,
760 struct cor_data_buf_item
, buf_list
);
763 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
764 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
765 trgt_out_l
->data_buf
.nextread
= container_of(
766 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
767 struct cor_data_buf_item
, buf_list
);
768 BUG_ON(&trgt_out_l
->data_buf
.nextread
->buf_list
==
769 &trgt_out_l
->data_buf
.items
);
770 trgt_out_l
->data_buf
.next_read_offset
=
771 trgt_out_l
->data_buf
.nextread
->datalen
;
774 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
777 void cor_databuf_pullold(struct cor_conn
*trgt_out_l
, __u32 startpos
, char *dst
,
780 __u32 pos
= trgt_out_l
->data_buf
.first_offset
;
781 struct cor_data_buf_item
*dbi
= container_of(
782 trgt_out_l
->data_buf
.items
.next
,
783 struct cor_data_buf_item
, buf_list
);
786 BUG_ON(&dbi
->buf_list
== &trgt_out_l
->data_buf
.items
);
788 if (cor_seqno_after(pos
+ dbi
->datalen
, startpos
))
792 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
799 char *srcbufcpystart
= 0;
800 int srcbufcpylen
= 0;
802 __u32 offset
= startpos
- pos
;
804 BUG_ON(&dbi
->buf_list
== &trgt_out_l
->data_buf
.items
);
806 BUG_ON(cor_seqno_before(startpos
, pos
));
807 BUG_ON(offset
> dbi
->datalen
);
809 srcbufcpystart
= dbi
->buf
+ offset
;
810 srcbufcpylen
= dbi
->datalen
- offset
;
812 if (cpy
> srcbufcpylen
)
815 memcpy(dst
, srcbufcpystart
, cpy
);
822 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
827 /* ack up to *not* including pos */
828 void cor_databuf_ack(struct cor_conn
*trgt_out_l
, __u32 pos
)
832 while (!list_empty(&trgt_out_l
->data_buf
.items
)) {
833 struct cor_data_buf_item
*firstitem
= container_of(
834 trgt_out_l
->data_buf
.items
.next
,
835 struct cor_data_buf_item
, buf_list
);
837 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
840 if (cor_seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
841 firstitem
->datalen
, pos
))
844 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
845 acked
+= firstitem
->datalen
;
847 cor_databuf_item_unlink(trgt_out_l
, firstitem
);
848 cor_databuf_item_free(firstitem
);
851 trgt_out_l
->data_buf
.datasize
-= acked
;
853 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
854 trgt_out_l
->data_buf
.overhead
!= 0);
856 if (unlikely(trgt_out_l
->trgt
.out
.nblist_busy_remaining
<= acked
)) {
857 trgt_out_l
->trgt
.out
.nblist_busy_remaining
= 0;
858 cor_conn_set_last_act(trgt_out_l
);
860 trgt_out_l
->trgt
.out
.nblist_busy_remaining
-= acked
;
864 cor_account_bufspace(trgt_out_l
);
867 void cor_databuf_ackread(struct cor_conn
*cn_lx
)
871 while (!list_empty(&cn_lx
->data_buf
.items
)) {
872 struct cor_data_buf_item
*firstitem
= container_of(
873 cn_lx
->data_buf
.items
.next
,
874 struct cor_data_buf_item
, buf_list
);
876 if (firstitem
== cn_lx
->data_buf
.nextread
)
879 acked
+= firstitem
->datalen
;
881 cor_databuf_item_unlink(cn_lx
, firstitem
);
882 cor_databuf_item_free(firstitem
);
885 cn_lx
->data_buf
.datasize
-= acked
;
886 cn_lx
->data_buf
.first_offset
+= acked
;
888 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
890 if (cn_lx
->targettype
== TARGET_OUT
) {
891 if (unlikely(cn_lx
->trgt
.out
.nblist_busy_remaining
<=
893 cn_lx
->trgt
.out
.nblist_busy_remaining
= 0;
894 cor_conn_set_last_act(cn_lx
);
896 cn_lx
->trgt
.out
.nblist_busy_remaining
-= acked
;
901 cor_account_bufspace(cn_lx
);
904 __u32
_cor_receive_buf(struct cor_conn
*cn_lx
, char *buf
, __u32 datalen
,
905 int from_sock
, __u8 windowused
, __u8 flush
)
907 struct cor_data_buf_item
*item
= 0;
911 if (list_empty(&cn_lx
->data_buf
.items
) == 0) {
912 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
914 item
= container_of(last
, struct cor_data_buf_item
, buf_list
);
917 while (datalen
> 0) {
920 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
921 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
923 if (item
== 0 || item
->type
!= DATABUF_BUF
||
924 item
->buflen
<= item
->datalen
) {
925 item
= kmem_cache_alloc(cor_data_buf_item_slab
,
927 if (unlikely(item
== 0))
930 memset(item
, 0, sizeof(struct cor_data_buf_item
));
931 item
->type
= DATABUF_BUF
;
933 item
->buflen
= cor_buf_optlen(datalen
, from_sock
);
934 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
936 if (unlikely(item
->buf
== 0)) {
937 kmem_cache_free(cor_data_buf_item_slab
, item
);
942 list_add_tail(&item
->buf_list
, &cn_lx
->data_buf
.items
);
944 cn_lx
->data_buf
.overhead
+= item
->buflen
+
945 sizeof(struct cor_data_buf_item
);
948 BUG_ON(item
->type
!= DATABUF_BUF
);
949 BUG_ON(item
->buflen
<= item
->datalen
);
951 if (cn_lx
->data_buf
.nextread
== 0) {
952 cn_lx
->data_buf
.nextread
= item
;
953 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
956 if (item
->buflen
- item
->datalen
< cpy
)
957 cpy
= (item
->buflen
- item
->datalen
);
959 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
960 item
->datalen
+= cpy
;
962 BUG_ON(cpy
> datalen
);
967 cn_lx
->data_buf
.read_remaining
+= cpy
;
968 cn_lx
->data_buf
.datasize
+= cpy
;
969 cn_lx
->data_buf
.overhead
-= cpy
;
970 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
971 cn_lx
->data_buf
.overhead
== 0);
976 cn_lx
->flush
= flush
;
978 cor_account_bufspace(cn_lx
);
979 cor_bufsize_update(cn_lx
, totalcpy
, windowused
, flush
);
984 __u32
cor_receive_skb(struct cor_conn
*src_in_l
, struct sk_buff
*skb
,
985 __u8 windowused
, __u8 flush
)
987 struct cor_skb_procstate
*ps
= cor_skb_pstate(skb
);
988 struct cor_data_buf_item
*item
= &ps
->funcstate
.rcv
.dbi
;
990 __u32 bufferleft
= 0;
992 BUG_ON(skb
->len
<= 0);
994 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
995 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
999 if (list_empty(&src_in_l
->data_buf
.items
) == 0) {
1000 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
1001 struct cor_data_buf_item
*item
= container_of(last
,
1002 struct cor_data_buf_item
, buf_list
);
1003 bufferleft
= item
->buflen
- item
->datalen
;
1006 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
1007 __u32 rc
= cor_receive_buf(src_in_l
, skb
->data
, skb
->len
,
1009 if (likely(rc
== skb
->len
))
1014 memset(item
, 0, sizeof(struct cor_data_buf_item
));
1016 item
->type
= DATABUF_SKB
;
1017 item
->buf
= skb
->data
;
1018 item
->datalen
= skb
->len
;
1019 item
->buflen
= item
->datalen
;
1020 list_add_tail(&item
->buf_list
, &src_in_l
->data_buf
.items
);
1021 if (src_in_l
->data_buf
.nextread
== 0)
1022 src_in_l
->data_buf
.nextread
= item
;
1024 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
1025 src_in_l
->data_buf
.datasize
+= item
->datalen
;
1026 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
1028 cor_account_bufspace(src_in_l
);
1029 cor_bufsize_update(src_in_l
, skb
->len
, windowused
, flush
);
1031 src_in_l
->flush
= flush
;
1036 void cor_wake_sender(struct cor_conn
*cn
)
1038 spin_lock_bh(&cn
->rcv_lock
);
1040 if (unlikely(cn
->isreset
)) {
1041 spin_unlock_bh(&cn
->rcv_lock
);
1045 switch (cn
->sourcetype
) {
1046 case SOURCE_UNCONNECTED
:
1047 spin_unlock_bh(&cn
->rcv_lock
);
1048 spin_lock_bh(&cor_get_conn_reversedir(cn
)->rcv_lock
);
1049 if (likely(cor_get_conn_reversedir(cn
)->isreset
== 0 &&
1050 cor_get_conn_reversedir(cn
)->targettype
==
1051 TARGET_UNCONNECTED
))
1052 cor_proc_cpacket(cor_get_conn_reversedir(cn
));
1053 spin_unlock_bh(&cor_get_conn_reversedir(cn
)->rcv_lock
);
1056 if (_cor_mngdsocket_flushtoconn(cn
) == RC_FTC_OK
&&
1057 cn
->src
.sock
.ed
->cs
!= 0 &&
1058 cor_sock_sndbufavailable(cn
, 1))
1059 cor_sk_write_space(cn
->src
.sock
.ed
->cs
);
1060 spin_unlock_bh(&cn
->rcv_lock
);
1063 cor_drain_ooo_queue(cn
);
1064 if (likely(cn
->src
.in
.established
!= 0))
1065 cor_send_ack_conn_ifneeded(cn
, 0, 0);
1066 spin_unlock_bh(&cn
->rcv_lock
);
1073 int __init
cor_forward_init(void)
1075 cor_data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
1076 sizeof(struct cor_data_buf_item
), 8, 0, 0);
1077 if (unlikely(cor_data_buf_item_slab
== 0))
1080 atomic64_set(&cor_bufused_sum
, 0);
1085 void __exit
cor_forward_exit2(void)
1087 BUG_ON(atomic64_read(&cor_bufused_sum
) != 0);
1089 kmem_cache_destroy(cor_data_buf_item_slab
);
1090 cor_data_buf_item_slab
= 0;
1093 MODULE_LICENSE("GPL");