2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*cor_data_buf_item_slab
;
27 atomic64_t cor_bufused_sum
;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&cor_bufused_sum);
34 void cor_databuf_init(struct cor_conn
*cn_init
)
36 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
37 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
40 void cor_bufsize_init(struct cor_conn
*cn_l
, __u32 bufsize
)
42 __u32 bufsize_shifted
;
44 memset(&(cn_l
->bufsize
), 0, sizeof(cn_l
->bufsize
));
46 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
47 bufsize_shifted
= U32_MAX
;
49 bufsize_shifted
= bufsize
<< 5;
51 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
52 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
53 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
56 int cor_account_bufspace(struct cor_conn
*cn_lx
)
58 __u64 space_needed
= 0;
60 __u64 bufused_sum_int
;
62 if (likely(cn_lx
->isreset
== 0)) {
63 space_needed
+= cn_lx
->data_buf
.datasize
;
64 space_needed
+= cn_lx
->data_buf
.overhead
;
65 if (cn_lx
->sourcetype
== SOURCE_IN
) {
66 space_needed
+= cn_lx
->source
.in
.reorder_memused
;
70 if (cn_lx
->bufspace_accounted
== space_needed
) {
71 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
73 } else if (atomic64_read(&cor_bufused_sum
) >=
74 BUFUSAGE_GLOBAL_MAX
) {
81 if (unlikely(space_needed
>= U32_MAX
))
84 space_req
= space_needed
;
86 if (cn_lx
->bufspace_accounted
== space_req
)
89 bufused_sum_int
= cor_update_atomic_sum(&cor_bufused_sum
,
90 cn_lx
->bufspace_accounted
, space_req
);
92 cn_lx
->bufspace_accounted
= space_req
;
94 if (cn_lx
->targettype
== TARGET_OUT
&& unlikely(
95 unlikely(cn_lx
->target
.out
.in_nb_busy_list
== 0) !=
96 unlikely(cn_lx
->data_buf
.datasize
== 0)))
97 cor_conn_set_last_act(cn_lx
);
99 if (space_needed
!= space_req
)
101 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
103 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
109 int cor_conn_src_unconn_write_allowed(struct cor_conn
*src_unconn_lx
)
111 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
113 if (src_unconn_lx
->data_buf
.datasize
== 0)
115 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_SRC_UNCONN
&&
116 cor_account_bufspace(src_unconn_lx
) == 0)
122 void cor_update_windowlimit(struct cor_conn
*src_in_lx
)
126 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
128 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
130 if (src_in_lx
->targettype
!= TARGET_OUT
) {
131 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
132 cor_account_bufspace(src_in_lx
)) {
133 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
135 } else if (cor_seqno_before_eq(src_in_lx
->target
.out
.seqno_windowlimit
,
136 src_in_lx
->target
.out
.seqno_nextsend
)) {
137 if (cor_account_bufspace(src_in_lx
)) {
138 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
141 __u32 windowleft
= (__u32
) min((__u64
) U32_MAX
,
143 src_in_lx
->target
.out
.seqno_windowlimit
-
144 src_in_lx
->target
.out
.seqno_nextsend
));
146 bufsize
= max(bufsize
, min(windowleft
,
147 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
149 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
150 cor_account_bufspace(src_in_lx
)) {
151 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
155 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
156 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
158 /* printk(KERN_ERR "window %p %u %u\n", src_in_lx, bufsize,
159 src_in_lx->data_buf.read_remaining); */
161 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
164 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
166 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
169 src_in_lx
->source
.in
.window_seqnolimit
=
170 src_in_lx
->source
.in
.next_seqno
+ bufsize
;
173 static int cor_bufsize_high_latency_sender(struct cor_conn
*cn_lx
)
175 struct cor_neighbor
*nb
;
179 if (cn_lx
->sourcetype
!= SOURCE_IN
)
182 nb
= cn_lx
->source
.in
.nb
;
183 if (unlikely(nb
== 0))
186 latency_us
= atomic_read(&(nb
->latency_retrans_us
));
187 latency_us
+= atomic_read(&(nb
->latency_stddev_retrans_us
));
188 latency_us
+= CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS
* 1000;
190 return latency_us
> 100000 ? 1 : 0;
193 __u8
_cor_bufsize_update_get_changerate(struct cor_conn
*cn_lx
)
195 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
196 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
200 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
202 } else if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
203 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
206 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
208 if (high_latency_sender
)
210 if (high_latency_conn
)
213 changerate
= 128 - speed
;
214 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
215 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
218 if (high_latency_sender
)
221 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
223 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
226 changerate
= 128 + speed
;
231 /* printk(KERN_ERR "changerate1 %u\n", changerate); */
233 if (cn_lx
->targettype
== TARGET_OUT
) {
234 __u16 remote_changerate
= ((__u16
)
235 cn_lx
->target
.out
.remote_bufsize_changerate
) +
237 /* printk(KERN_ERR "changerate2 %u\n", remote_changerate); */
238 changerate
= (changerate
* remote_changerate
) / 128;
239 /* printk(KERN_ERR "changerate3 %u\n", changerate); */
242 if (unlikely(changerate
< 64))
244 else if (unlikely(changerate
- 64 >= 256))
247 return (__u8
) (changerate
- 64);
250 static void _cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
251 int high_latency_sender
, int high_latency_conn
)
253 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
254 BUG_ON(BUFSIZE_SHIFT
!= 5);
257 * If you change the speed here, change it in
258 * _cor_bufsize_update_get_changerate too
261 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
262 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
263 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
267 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
268 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
269 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
272 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
273 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
277 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
280 if (high_latency_sender
)
283 change
= rcvd
* speed
;
284 if (high_latency_conn
)
287 if (cn_lx
->bufsize
.bufsize
< change
)
288 cn_lx
->bufsize
.bufsize
= 0;
290 cn_lx
->bufsize
.bufsize
-= change
;
292 if (cn_lx
->bufsize
.act
.decr
.size_start
/4 >
293 cn_lx
->bufsize
.bufsize
)
294 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
295 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
296 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
300 if (high_latency_sender
)
303 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
305 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
308 change
= rcvd
* speed
;
309 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
310 cn_lx
->bufsize
.bufsize
= U32_MAX
;
312 cn_lx
->bufsize
.bufsize
+= change
;
314 if (cn_lx
->bufsize
.bufsize
>=
315 cn_lx
->bufsize
.act
.incr
.size_end
) {
316 cn_lx
->bufsize
.bufsize
=
317 cn_lx
->bufsize
.act
.incr
.size_end
;
318 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
319 if (high_latency_conn
) {
320 cn_lx
->bufsize
.act
.noact
.bytesleft
=
321 (cn_lx
->bufsize
.bufsize
>>
324 cn_lx
->bufsize
.act
.noact
.bytesleft
=
325 (cn_lx
->bufsize
.bufsize
>>
333 if (unlikely(rcvd
>= (1 << 24)) ||
334 cn_lx
->bufsize
.bytes_rcvd
+ rcvd
>= (1 << 24))
335 cn_lx
->bufsize
.bytes_rcvd
= (1 << 24) - 1;
337 cn_lx
->bufsize
.bytes_rcvd
+= rcvd
;
340 static __u32
cor_get_read_remaining_min(__u32 bufsize_bytes
,
341 int high_latency_sender
, int high_latency_conn
)
343 int bufspace_low
= (atomic64_read(&cor_bufused_sum
) >=
344 3*(BUFUSAGE_GLOBAL_MAX
/4));
346 if (high_latency_conn
) {
347 if (high_latency_sender
) {
349 return bufsize_bytes
/6 + 1;
351 return bufsize_bytes
/3 + 1;
355 return bufsize_bytes
/8 + 1;
357 return bufsize_bytes
/4 + 1;
361 if (high_latency_sender
) {
363 return bufsize_bytes
/6 + 1;
365 return bufsize_bytes
/4 + 1;
369 return bufsize_bytes
/12 + 1;
371 return bufsize_bytes
/8 + 1;
377 static void cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
378 __u8 windowused
, __u8 rcv_flushrcvd
)
380 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
381 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
382 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
383 __u32 read_remaining_min
= cor_get_read_remaining_min(bufsize_bytes
,
384 high_latency_sender
, high_latency_conn
);
385 __u32 read_remaining_min_nofastdecr
= read_remaining_min
*2;
386 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
387 read_remaining_min_nofastdecr
)/2;
388 __u32 read_remaining
;
390 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
391 BUG_ON(windowused
> 31);
393 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
394 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
395 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
397 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
399 read_remaining
= bufsize_bytes
;
401 read_remaining
= max(cn_lx
->data_buf
.read_remaining
- rcvd
,
402 (bufsize_bytes
* (31 - windowused
)) / 31);
405 if (rcv_flushrcvd
!= 0) {
406 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
407 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
408 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
409 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
410 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
411 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
412 cn_lx
->bufsize
.act
.noact
.bytesleft
<
414 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
418 if (read_remaining
< read_remaining_min
) {
419 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
423 if (high_latency_sender
) {
424 if (buf_increase_bytes
< rcvd
/16)
425 buf_increase_bytes
= rcvd
/16;
427 if (buf_increase_bytes
< rcvd
/32)
428 buf_increase_bytes
= rcvd
/32;
431 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
432 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
434 buf_increase
= U32_MAX
;
436 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
437 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
438 bufsize_end
= U32_MAX
;
440 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
441 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
442 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
443 cn_lx
->bufsize
.act
.incr
.size_start
=
444 cn_lx
->bufsize
.bufsize
;
445 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
448 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
449 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
450 if (bufsize_end
/4 > cn_lx
->bufsize
.act
.incr
.size_start
)
451 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
452 } else if (read_remaining
< read_remaining_min_nodecr
) {
453 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
454 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
455 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
458 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
459 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
460 if (high_latency_conn
)
462 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
463 rtt_mul
> bytesleft
))
464 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
465 BUFSIZE_SHIFT
) * rtt_mul
;
467 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
468 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
470 } else if (read_remaining
< read_remaining_min_nofastdecr
) {
471 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
472 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
473 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
474 cn_lx
->bufsize
.act
.decr
.size_start
=
475 cn_lx
->bufsize
.bufsize
;
479 if (cn_lx
->targettype
== TARGET_OUT
) {
480 __u16 rem_changerate
= ((__u16
)
481 cn_lx
->target
.out
.remote_bufsize_changerate
) +
483 __u16 crate_nofastdecr
;
485 __u16 crate_nofastincr
;
488 if (high_latency_conn
) {
489 crate_nofastdecr
= 128 - 128/8;
490 crate_nodecr
= 128 - 128/6;
491 crate_nofastincr
= 128 + 128/4;
492 crate_noincr
= 128 + 128/3;
494 crate_nofastdecr
= 128 - 128/16;
495 crate_nodecr
= 128 - 128/12;
496 crate_nofastincr
= 128 + 128/8;
497 crate_noincr
= 128 + 128/6;
500 if ((rem_changerate
< crate_nodecr
||
501 rem_changerate
> crate_noincr
) &&
502 cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
503 cn_lx
->bufsize
.act
.noact
.bytesleft
= max(
504 cn_lx
->bufsize
.act
.noact
.bytesleft
,
505 cn_lx
->bufsize
.bufsize
>>
509 if (rem_changerate
< crate_nodecr
&& (
510 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
511 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)) {
512 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
513 cn_lx
->bufsize
.act
.noact
.bytesleft
=
514 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
516 if (rem_changerate
< crate_nofastdecr
&&
517 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
518 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
519 cn_lx
->bufsize
.act
.decr
.size_start
=
520 cn_lx
->bufsize
.bufsize
;
523 if (rem_changerate
> crate_noincr
&& (
524 cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
525 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)) {
526 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
527 cn_lx
->bufsize
.act
.noact
.bytesleft
=
528 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
530 if (rem_changerate
> crate_nofastincr
&&
531 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
532 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
533 cn_lx
->bufsize
.act
.incr
.size_start
=
534 cn_lx
->bufsize
.bufsize
;
538 _cor_bufsize_update(cn_lx
, rcvd
, high_latency_sender
,
542 void cor_bufsize_read_to_sock(struct cor_conn
*trgt_sock_lx
)
544 unsigned long jiffies_tmp
= jiffies
;
545 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
548 if (trgt_sock_lx
->target
.sock
.waiting_for_userspace
!= 0 && time_before(
549 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
,
550 jiffies
- latency_limit
)) {
551 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
552 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
555 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
556 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 0;
558 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 1;
559 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
=
564 static inline void cor_databuf_item_unlink(struct cor_conn
*cn_lx
,
565 struct cor_data_buf_item
*item
)
567 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
568 list_del(&(item
->buf_list
));
569 if (item
->type
== DATABUF_BUF
) {
570 cn_lx
->data_buf
.overhead
-= sizeof(struct cor_data_buf_item
) +
571 item
->buflen
- item
->datalen
;
572 } else if (item
->type
== DATABUF_SKB
) {
573 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
579 void cor_databuf_ackdiscard(struct cor_conn
*cn_lx
)
583 cn_lx
->data_buf
.next_read_offset
= 0;
584 cn_lx
->data_buf
.nextread
= 0;
586 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
587 struct cor_data_buf_item
*item
= container_of(
588 cn_lx
->data_buf
.items
.next
,
589 struct cor_data_buf_item
, buf_list
);
590 freed
+= item
->datalen
;
592 cor_databuf_item_unlink(cn_lx
, item
);
593 cor_databuf_item_free(item
);
596 cn_lx
->data_buf
.datasize
-= freed
;
597 cn_lx
->data_buf
.first_offset
+= freed
;
599 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
600 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
602 cn_lx
->data_buf
.read_remaining
= 0;
605 void cor_reset_seqno(struct cor_conn
*cn_l
, __u64 initseqno
)
607 cn_l
->data_buf
.first_offset
= initseqno
-
608 cn_l
->data_buf
.datasize
+
609 cn_l
->data_buf
.read_remaining
;
612 static void cor_databuf_nextreadchunk(struct cor_conn
*cn_lx
)
614 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
615 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
616 cn_lx
->data_buf
.nextread
->datalen
);
618 if (&(cn_lx
->data_buf
.nextread
->buf_list
) ==
619 cn_lx
->data_buf
.items
.prev
) {
620 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
621 cn_lx
->data_buf
.nextread
= 0;
624 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
625 cn_lx
->data_buf
.nextread
= container_of(
626 cn_lx
->data_buf
.nextread
->buf_list
.next
,
627 struct cor_data_buf_item
, buf_list
);
630 cn_lx
->data_buf
.next_read_offset
= 0;
633 void cor_databuf_pull(struct cor_conn
*cn_lx
, char *dst
, __u32 len
)
635 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
640 char *srcbufcpystart
= 0;
641 int srcbufcpylen
= 0;
643 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
644 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
645 cn_lx
->data_buf
.nextread
->datalen
);
647 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
648 cn_lx
->data_buf
.next_read_offset
;
649 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
650 cn_lx
->data_buf
.next_read_offset
;
652 if (cpy
> srcbufcpylen
)
655 memcpy(dst
, srcbufcpystart
, cpy
);
660 cn_lx
->data_buf
.read_remaining
-= cpy
;
661 cn_lx
->data_buf
.next_read_offset
+= cpy
;
663 if (cpy
== srcbufcpylen
)
664 cor_databuf_nextreadchunk(cn_lx
);
668 void cor_databuf_unpull_dpi(struct cor_conn
*trgt_sock
, struct cor_sock
*cs
,
669 struct cor_data_buf_item
*item
, __u16 next_read_offset
)
671 BUG_ON(next_read_offset
> item
->datalen
);
673 if (next_read_offset
>= item
->datalen
)
676 spin_lock_bh(&(trgt_sock
->rcv_lock
));
678 if (unlikely(cor_is_trgt_sock(trgt_sock
, cs
) == 0)) {
679 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
683 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
684 &(trgt_sock
->data_buf
.nextread
->buf_list
) !=
685 trgt_sock
->data_buf
.items
.next
);
686 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
688 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
689 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
690 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
692 if (item
->type
== DATABUF_BUF
) {
693 trgt_sock
->data_buf
.overhead
+=
694 sizeof(struct cor_data_buf_item
) +
695 item
->buflen
- item
->datalen
;
696 } else if (item
->type
== DATABUF_SKB
) {
697 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
702 list_add(&(item
->buf_list
), &(trgt_sock
->data_buf
.items
));
704 trgt_sock
->data_buf
.nextread
= item
;
705 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
707 cor_account_bufspace(trgt_sock
);
709 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
713 cor_databuf_item_free(item
);
717 void cor_databuf_pull_dbi(struct cor_sock
*cs_rl
, struct cor_conn
*trgt_sock_l
)
719 struct cor_data_buf_item
*dbi
= 0;
720 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
721 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
723 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
726 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
727 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
728 trgt_sock_l
->data_buf
.nextread
->datalen
);
729 dbi
= trgt_sock_l
->data_buf
.nextread
;
731 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
733 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
734 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
736 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
737 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
738 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
740 cor_account_bufspace(trgt_sock_l
);
742 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
743 cor_databuf_nextreadchunk(trgt_sock_l
);
745 cor_databuf_item_unlink(trgt_sock_l
, dbi
);
748 void cor_databuf_unpull(struct cor_conn
*trgt_out_l
, __u32 bytes
)
750 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
752 BUG_ON(list_empty(&(trgt_out_l
->data_buf
.items
)) != 0);
754 if (trgt_out_l
->data_buf
.nextread
== 0) {
755 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
757 trgt_out_l
->data_buf
.nextread
= container_of(
758 trgt_out_l
->data_buf
.items
.prev
,
759 struct cor_data_buf_item
, buf_list
);
762 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
763 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
764 trgt_out_l
->data_buf
.nextread
= container_of(
765 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
766 struct cor_data_buf_item
, buf_list
);
767 BUG_ON(&(trgt_out_l
->data_buf
.nextread
->buf_list
) ==
768 &(trgt_out_l
->data_buf
.items
));
769 trgt_out_l
->data_buf
.next_read_offset
=
770 trgt_out_l
->data_buf
.nextread
->datalen
;
773 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
776 void cor_databuf_pullold(struct cor_conn
*trgt_out_l
, __u64 startpos
, char *dst
,
779 __u64 pos
= trgt_out_l
->data_buf
.first_offset
;
780 struct cor_data_buf_item
*dbi
= container_of(
781 trgt_out_l
->data_buf
.items
.next
,
782 struct cor_data_buf_item
, buf_list
);
785 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
787 if (cor_seqno_after(pos
+ dbi
->datalen
, startpos
))
791 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
798 char *srcbufcpystart
= 0;
799 int srcbufcpylen
= 0;
801 __u64 offset
= cor_seqno_clean(startpos
- pos
);
803 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
805 BUG_ON(cor_seqno_before(startpos
, pos
));
806 BUG_ON(offset
> dbi
->datalen
);
808 srcbufcpystart
= dbi
->buf
+ offset
;
809 srcbufcpylen
= dbi
->datalen
- offset
;
811 if (cpy
> srcbufcpylen
)
814 memcpy(dst
, srcbufcpystart
, cpy
);
821 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
826 /* ack up to *not* including pos */
827 void cor_databuf_ack(struct cor_conn
*trgt_out_l
, __u64 pos
)
831 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
832 struct cor_data_buf_item
*firstitem
= container_of(
833 trgt_out_l
->data_buf
.items
.next
,
834 struct cor_data_buf_item
, buf_list
);
836 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
839 if (cor_seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
840 firstitem
->datalen
, pos
))
843 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
844 acked
+= firstitem
->datalen
;
846 cor_databuf_item_unlink(trgt_out_l
, firstitem
);
847 cor_databuf_item_free(firstitem
);
850 trgt_out_l
->data_buf
.datasize
-= acked
;
852 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
853 trgt_out_l
->data_buf
.overhead
!= 0);
855 if (unlikely(trgt_out_l
->target
.out
.nblist_busy_remaining
<= acked
)) {
856 trgt_out_l
->target
.out
.nblist_busy_remaining
= 0;
857 cor_conn_set_last_act(trgt_out_l
);
859 trgt_out_l
->target
.out
.nblist_busy_remaining
-= acked
;
863 cor_account_bufspace(trgt_out_l
);
866 void cor_databuf_ackread(struct cor_conn
*cn_lx
)
870 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
871 struct cor_data_buf_item
*firstitem
= container_of(
872 cn_lx
->data_buf
.items
.next
,
873 struct cor_data_buf_item
, buf_list
);
875 if (firstitem
== cn_lx
->data_buf
.nextread
)
878 acked
+= firstitem
->datalen
;
880 cor_databuf_item_unlink(cn_lx
, firstitem
);
881 cor_databuf_item_free(firstitem
);
884 cn_lx
->data_buf
.datasize
-= acked
;
885 cn_lx
->data_buf
.first_offset
+= acked
;
887 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
889 if (cn_lx
->targettype
== TARGET_OUT
) {
890 if (unlikely(cn_lx
->target
.out
.nblist_busy_remaining
<=
892 cn_lx
->target
.out
.nblist_busy_remaining
= 0;
893 cor_conn_set_last_act(cn_lx
);
895 cn_lx
->target
.out
.nblist_busy_remaining
-= acked
;
900 cor_account_bufspace(cn_lx
);
903 __u32
_cor_receive_buf(struct cor_conn
*cn_lx
, char *buf
, __u32 datalen
,
904 int from_sock
, __u8 windowused
, __u8 flush
)
906 struct cor_data_buf_item
*item
= 0;
910 if (list_empty(&(cn_lx
->data_buf
.items
)) == 0) {
911 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
912 item
= container_of(last
, struct cor_data_buf_item
, buf_list
);
915 while (datalen
> 0) {
918 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
919 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
921 if (item
== 0 || item
->type
!= DATABUF_BUF
||
922 item
->buflen
<= item
->datalen
) {
923 item
= kmem_cache_alloc(cor_data_buf_item_slab
,
925 if (unlikely(item
== 0))
928 memset(item
, 0, sizeof(struct cor_data_buf_item
));
929 item
->type
= DATABUF_BUF
;
931 item
->buflen
= cor_buf_optlen(datalen
, from_sock
);
932 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
934 if (unlikely(item
->buf
== 0)) {
935 kmem_cache_free(cor_data_buf_item_slab
, item
);
940 list_add_tail(&(item
->buf_list
),
941 &(cn_lx
->data_buf
.items
));
943 cn_lx
->data_buf
.overhead
+= item
->buflen
+
944 sizeof(struct cor_data_buf_item
);
947 BUG_ON(item
->type
!= DATABUF_BUF
);
948 BUG_ON(item
->buflen
<= item
->datalen
);
950 if (cn_lx
->data_buf
.nextread
== 0) {
951 cn_lx
->data_buf
.nextread
= item
;
952 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
955 if (item
->buflen
- item
->datalen
< cpy
)
956 cpy
= (item
->buflen
- item
->datalen
);
958 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
959 item
->datalen
+= cpy
;
961 BUG_ON(cpy
> datalen
);
966 cn_lx
->data_buf
.read_remaining
+= cpy
;
967 cn_lx
->data_buf
.datasize
+= cpy
;
968 cn_lx
->data_buf
.overhead
-= cpy
;
969 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
970 cn_lx
->data_buf
.overhead
== 0);
975 cn_lx
->flush
= flush
;
977 cor_account_bufspace(cn_lx
);
978 cor_bufsize_update(cn_lx
, totalcpy
, windowused
, flush
);
983 __u32
cor_receive_skb(struct cor_conn
*src_in_l
, struct sk_buff
*skb
,
984 __u8 windowused
, __u8 flush
)
986 struct cor_skb_procstate
*ps
= cor_skb_pstate(skb
);
987 struct cor_data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
989 __u32 bufferleft
= 0;
991 BUG_ON(skb
->len
<= 0);
993 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
994 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
998 if (list_empty(&(src_in_l
->data_buf
.items
)) == 0) {
999 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
1000 struct cor_data_buf_item
*item
= container_of(last
,
1001 struct cor_data_buf_item
, buf_list
);
1002 bufferleft
= item
->buflen
- item
->datalen
;
1005 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
1006 __u32 rc
= cor_receive_buf(src_in_l
, skb
->data
, skb
->len
,
1008 if (likely(rc
== skb
->len
))
1013 memset(item
, 0, sizeof(struct cor_data_buf_item
));
1015 item
->type
= DATABUF_SKB
;
1016 item
->buf
= skb
->data
;
1017 item
->datalen
= skb
->len
;
1018 item
->buflen
= item
->datalen
;
1019 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
1020 if (src_in_l
->data_buf
.nextread
== 0)
1021 src_in_l
->data_buf
.nextread
= item
;
1023 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
1024 src_in_l
->data_buf
.datasize
+= item
->datalen
;
1025 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
1027 cor_account_bufspace(src_in_l
);
1028 cor_bufsize_update(src_in_l
, skb
->len
, windowused
, flush
);
1030 src_in_l
->flush
= flush
;
1035 void cor_wake_sender(struct cor_conn
*cn
)
1037 spin_lock_bh(&(cn
->rcv_lock
));
1039 if (unlikely(cn
->isreset
)) {
1040 spin_unlock_bh(&(cn
->rcv_lock
));
1044 switch (cn
->sourcetype
) {
1045 case SOURCE_UNCONNECTED
:
1046 spin_unlock_bh(&(cn
->rcv_lock
));
1047 spin_lock_bh(&(cor_get_conn_reversedir(cn
)->rcv_lock
));
1048 if (likely(cor_get_conn_reversedir(cn
)->isreset
== 0 &&
1049 cor_get_conn_reversedir(cn
)->targettype
==
1050 TARGET_UNCONNECTED
))
1051 cor_proc_cpacket(cor_get_conn_reversedir(cn
));
1052 spin_unlock_bh(&(cor_get_conn_reversedir(cn
)->rcv_lock
));
1055 if (_cor_mngdsocket_flushtoconn(cn
) == RC_FTC_OK
&&
1056 cn
->source
.sock
.ed
->cs
!= 0 &&
1057 cor_sock_sndbufavailable(cn
, 1))
1058 cor_sk_write_space(cn
->source
.sock
.ed
->cs
);
1059 spin_unlock_bh(&(cn
->rcv_lock
));
1062 cor_drain_ooo_queue(cn
);
1063 if (likely(cn
->source
.in
.established
!= 0)) {
1064 cor_send_ack_conn_ifneeded(cn
, 0, 0);
1066 spin_unlock_bh(&(cn
->rcv_lock
));
1073 int __init
cor_forward_init(void)
1075 cor_data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
1076 sizeof(struct cor_data_buf_item
), 8, 0, 0);
1077 if (unlikely(cor_data_buf_item_slab
== 0))
1080 atomic64_set(&cor_bufused_sum
, 0);
1085 void __exit
cor_forward_exit2(void)
1087 BUG_ON(atomic64_read(&cor_bufused_sum
) != 0);
1089 kmem_cache_destroy(cor_data_buf_item_slab
);
1090 cor_data_buf_item_slab
= 0;
1093 MODULE_LICENSE("GPL");