2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*cor_data_buf_item_slab
;
27 atomic64_t cor_bufused_sum
;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&cor_bufused_sum);
34 void cor_databuf_init(struct cor_conn
*cn_init
)
36 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
37 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
40 void cor_bufsize_init(struct cor_conn
*cn_l
, __u32 bufsize
)
42 __u32 bufsize_shifted
;
44 memset(&(cn_l
->bufsize
), 0, sizeof(cn_l
->bufsize
));
46 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
47 bufsize_shifted
= U32_MAX
;
49 bufsize_shifted
= bufsize
<< 5;
51 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
52 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
53 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
56 #warning todo reset stalled conns on low memory
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int cor_account_bufspace(struct cor_conn
*cn_lx
)
63 __u64 space_needed
= 0;
65 __u64 bufused_sum_int
;
67 if (likely(cn_lx
->isreset
== 0)) {
68 space_needed
+= cn_lx
->data_buf
.datasize
;
69 space_needed
+= cn_lx
->data_buf
.overhead
;
70 if (cn_lx
->sourcetype
== SOURCE_IN
) {
71 space_needed
+= cn_lx
->source
.in
.reorder_memused
;
75 if (cn_lx
->bufspace_accounted
== space_needed
) {
76 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
78 } else if (atomic64_read(&cor_bufused_sum
) >=
79 BUFUSAGE_GLOBAL_MAX
) {
86 if (unlikely(space_needed
>= U32_MAX
))
89 space_req
= space_needed
;
91 if (cn_lx
->bufspace_accounted
== space_req
)
94 bufused_sum_int
= cor_update_atomic_sum(&cor_bufused_sum
,
95 cn_lx
->bufspace_accounted
, space_req
);
97 cn_lx
->bufspace_accounted
= space_req
;
99 if (space_needed
!= space_req
)
101 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
103 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
109 int cor_cpacket_write_allowed(struct cor_conn
*src_unconn_lx
)
111 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
113 if (src_unconn_lx
->data_buf
.datasize
== 0)
115 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_CPACKETS
&&
116 cor_account_bufspace(src_unconn_lx
) == 0)
122 void cor_update_windowlimit(struct cor_conn
*src_in_lx
)
126 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
128 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
130 if (src_in_lx
->targettype
!= TARGET_OUT
) {
131 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
132 cor_account_bufspace(src_in_lx
)) {
133 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
135 } else if (cor_seqno_before_eq(src_in_lx
->target
.out
.seqno_windowlimit
,
136 src_in_lx
->target
.out
.seqno_nextsend
)) {
137 if (cor_account_bufspace(src_in_lx
)) {
138 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
141 __u32 windowleft
= (__u32
) min((__u64
) U32_MAX
,
143 src_in_lx
->target
.out
.seqno_windowlimit
-
144 src_in_lx
->target
.out
.seqno_nextsend
));
146 bufsize
= max(bufsize
, min(windowleft
,
147 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
149 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
150 cor_account_bufspace(src_in_lx
)) {
151 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
155 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
156 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
158 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
160 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
163 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
165 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
168 src_in_lx
->source
.in
.window_seqnolimit
=
169 src_in_lx
->source
.in
.next_seqno
+ bufsize
;
172 static int cor_bufsize_high_latency_sender(struct cor_conn
*cn_lx
)
174 struct cor_neighbor
*nb
;
178 if (cn_lx
->sourcetype
!= SOURCE_IN
)
181 nb
= cn_lx
->source
.in
.nb
;
182 if (unlikely(nb
== 0))
185 latency_us
= atomic_read(&(nb
->latency_retrans_us
));
186 latency_us
+= atomic_read(&(nb
->latency_stddev_retrans_us
));
187 latency_us
+= CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS
* 1000;
189 return latency_us
> 100000 ? 1 : 0;
192 __u8
_cor_bufsize_update_get_changerate(struct cor_conn
*cn_lx
)
194 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
195 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
199 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
201 } else if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
202 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
205 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
207 if (high_latency_sender
)
209 if (high_latency_conn
)
212 changerate
= 128 - speed
;
213 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
214 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
217 if (high_latency_sender
)
220 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
222 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
225 changerate
= 128 + speed
;
230 /* printk(KERN_ERR "changerate1 %u", changerate); */
232 if (cn_lx
->targettype
== TARGET_OUT
) {
233 __u16 remote_changerate
= ((__u16
)
234 cn_lx
->target
.out
.remote_bufsize_changerate
) +
236 /* printk(KERN_ERR "changerate2 %u", remote_changerate); */
237 changerate
= (changerate
* remote_changerate
) / 128;
238 /* printk(KERN_ERR "changerate3 %u", changerate); */
241 if (unlikely(changerate
< 64))
243 else if (unlikely(changerate
- 64 >= 256))
246 return (__u8
) (changerate
- 64);
249 static void _cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
250 int high_latency_sender
, int high_latency_conn
)
252 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
253 BUG_ON(BUFSIZE_SHIFT
!= 5);
256 * If you change the speed here, change it in
257 * _cor_bufsize_update_get_changerate too
260 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
261 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
262 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
266 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
267 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
268 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
271 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
272 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
276 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
279 if (high_latency_sender
)
282 change
= rcvd
* speed
;
283 if (high_latency_conn
)
286 if (cn_lx
->bufsize
.bufsize
< change
)
287 cn_lx
->bufsize
.bufsize
= 0;
289 cn_lx
->bufsize
.bufsize
-= change
;
291 if (cn_lx
->bufsize
.act
.decr
.size_start
/4 >
292 cn_lx
->bufsize
.bufsize
)
293 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
294 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
295 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
299 if (high_latency_sender
)
302 if (unlikely(cor_bufsize_initial_phase(cn_lx
)))
304 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
307 change
= rcvd
* speed
;
308 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
309 cn_lx
->bufsize
.bufsize
= U32_MAX
;
311 cn_lx
->bufsize
.bufsize
+= change
;
313 if (cn_lx
->bufsize
.bufsize
>=
314 cn_lx
->bufsize
.act
.incr
.size_end
) {
315 cn_lx
->bufsize
.bufsize
=
316 cn_lx
->bufsize
.act
.incr
.size_end
;
317 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
318 if (high_latency_conn
) {
319 cn_lx
->bufsize
.act
.noact
.bytesleft
=
320 (cn_lx
->bufsize
.bufsize
>>
323 cn_lx
->bufsize
.act
.noact
.bytesleft
=
324 (cn_lx
->bufsize
.bufsize
>>
332 if (unlikely(rcvd
>= (1 << 24)) ||
333 cn_lx
->bufsize
.bytes_rcvd
+ rcvd
>= (1 << 24))
334 cn_lx
->bufsize
.bytes_rcvd
= (1 << 24) - 1;
336 cn_lx
->bufsize
.bytes_rcvd
+= rcvd
;
339 static __u32
cor_get_read_remaining_min(__u32 bufsize_bytes
,
340 int high_latency_sender
, int high_latency_conn
)
342 int bufspace_low
= (atomic64_read(&cor_bufused_sum
) >=
343 3*(BUFUSAGE_GLOBAL_MAX
/4));
345 if (high_latency_conn
) {
346 if (high_latency_sender
) {
348 return bufsize_bytes
/6 + 1;
350 return bufsize_bytes
/3 + 1;
354 return bufsize_bytes
/8 + 1;
356 return bufsize_bytes
/4 + 1;
360 if (high_latency_sender
) {
362 return bufsize_bytes
/6 + 1;
364 return bufsize_bytes
/4 + 1;
368 return bufsize_bytes
/12 + 1;
370 return bufsize_bytes
/8 + 1;
376 static void cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
377 int rcv_delayed_lowbuf
, __u8 rcv_flushrcvd
)
379 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
380 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
381 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
382 __u32 read_remaining_min
= cor_get_read_remaining_min(bufsize_bytes
,
383 high_latency_sender
, high_latency_conn
);
384 __u32 read_remaining_min_nofastdecr
= read_remaining_min
*2;
385 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
386 read_remaining_min_nofastdecr
)/2;
387 __u32 read_remaining
= cn_lx
->data_buf
.read_remaining
- rcvd
;
389 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
391 if (rcv_flushrcvd
!= 0) {
392 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
393 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
394 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
395 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
396 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
397 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
398 cn_lx
->bufsize
.act
.noact
.bytesleft
<
400 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
404 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
405 rcv_delayed_lowbuf
= 0;
406 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
407 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
409 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
412 if (rcv_delayed_lowbuf
&& read_remaining
< read_remaining_min
) {
413 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
417 if (high_latency_sender
) {
418 if (buf_increase_bytes
< rcvd
/16)
419 buf_increase_bytes
= rcvd
/16;
421 if (buf_increase_bytes
< rcvd
/32)
422 buf_increase_bytes
= rcvd
/32;
425 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
426 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
428 buf_increase
= U32_MAX
;
430 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
431 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
432 bufsize_end
= U32_MAX
;
434 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
435 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
436 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
437 cn_lx
->bufsize
.act
.incr
.size_start
=
438 cn_lx
->bufsize
.bufsize
;
439 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
442 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
443 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
444 if (bufsize_end
/4 > cn_lx
->bufsize
.act
.incr
.size_start
)
445 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
446 } else if (rcv_delayed_lowbuf
&&
447 read_remaining
< read_remaining_min_nodecr
) {
448 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
449 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
450 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
453 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
454 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
455 if (high_latency_conn
)
457 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
458 rtt_mul
> bytesleft
))
459 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
460 BUFSIZE_SHIFT
) * rtt_mul
;
462 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
463 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
465 } else if (rcv_delayed_lowbuf
&&
466 read_remaining
< read_remaining_min_nofastdecr
) {
467 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
468 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
469 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
470 cn_lx
->bufsize
.act
.decr
.size_start
=
471 cn_lx
->bufsize
.bufsize
;
475 if (cn_lx
->targettype
== TARGET_OUT
) {
476 __u16 rem_changerate
= ((__u16
)
477 cn_lx
->target
.out
.remote_bufsize_changerate
) +
479 __u16 crate_nofastdecr
;
481 __u16 crate_nofastincr
;
484 if (high_latency_conn
) {
485 crate_nofastdecr
= 128 - 128/8;
486 crate_nodecr
= 128 - 128/6;
487 crate_nofastincr
= 128 + 128/4;
488 crate_noincr
= 128 + 128/3;
490 crate_nofastdecr
= 128 - 128/16;
491 crate_nodecr
= 128 - 128/12;
492 crate_nofastincr
= 128 + 128/8;
493 crate_noincr
= 128 + 128/6;
496 if ((rem_changerate
< crate_nodecr
||
497 rem_changerate
> crate_noincr
) &&
498 cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
499 cn_lx
->bufsize
.act
.noact
.bytesleft
= max(
500 cn_lx
->bufsize
.act
.noact
.bytesleft
,
501 cn_lx
->bufsize
.bufsize
>>
505 if (rem_changerate
< crate_nodecr
&& (
506 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
507 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)) {
508 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
509 cn_lx
->bufsize
.act
.noact
.bytesleft
=
510 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
512 if (rem_changerate
< crate_nofastdecr
&&
513 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
514 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
515 cn_lx
->bufsize
.act
.decr
.size_start
=
516 cn_lx
->bufsize
.bufsize
;
519 if (rem_changerate
> crate_noincr
&& (
520 cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
521 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)) {
522 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
523 cn_lx
->bufsize
.act
.noact
.bytesleft
=
524 cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
526 if (rem_changerate
> crate_nofastincr
&&
527 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
528 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
529 cn_lx
->bufsize
.act
.incr
.size_start
=
530 cn_lx
->bufsize
.bufsize
;
534 _cor_bufsize_update(cn_lx
, rcvd
, high_latency_sender
,
538 void cor_bufsize_read_to_sock(struct cor_conn
*trgt_sock_lx
)
540 unsigned long jiffies_tmp
= jiffies
;
541 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
544 if (trgt_sock_lx
->target
.sock
.waiting_for_userspace
!= 0 && time_before(
545 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
,
546 jiffies
- latency_limit
)) {
547 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
548 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
551 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
552 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 0;
554 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 1;
555 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
=
560 static inline void cor_databuf_item_unlink(struct cor_conn
*cn_lx
,
561 struct cor_data_buf_item
*item
)
563 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
564 list_del(&(item
->buf_list
));
565 if (item
->type
== DATABUF_BUF
) {
566 cn_lx
->data_buf
.overhead
-= sizeof(struct cor_data_buf_item
) +
567 item
->buflen
- item
->datalen
;
568 } else if (item
->type
== DATABUF_SKB
) {
569 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
575 void cor_databuf_ackdiscard(struct cor_conn
*cn_lx
)
579 cn_lx
->data_buf
.next_read_offset
= 0;
580 cn_lx
->data_buf
.nextread
= 0;
582 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
583 struct cor_data_buf_item
*item
= container_of(
584 cn_lx
->data_buf
.items
.next
,
585 struct cor_data_buf_item
, buf_list
);
586 freed
+= item
->datalen
;
588 cor_databuf_item_unlink(cn_lx
, item
);
589 cor_databuf_item_free(item
);
592 cn_lx
->data_buf
.datasize
-= freed
;
593 cn_lx
->data_buf
.first_offset
+= freed
;
595 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
596 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
598 cn_lx
->data_buf
.read_remaining
= 0;
601 void cor_reset_seqno(struct cor_conn
*cn_l
, __u64 initseqno
)
603 cn_l
->data_buf
.first_offset
= initseqno
-
604 cn_l
->data_buf
.datasize
+
605 cn_l
->data_buf
.read_remaining
;
608 static void cor_databuf_nextreadchunk(struct cor_conn
*cn_lx
)
610 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
611 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
612 cn_lx
->data_buf
.nextread
->datalen
);
614 if (&(cn_lx
->data_buf
.nextread
->buf_list
) ==
615 cn_lx
->data_buf
.items
.prev
) {
616 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
617 cn_lx
->data_buf
.nextread
= 0;
620 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
621 cn_lx
->data_buf
.nextread
= container_of(
622 cn_lx
->data_buf
.nextread
->buf_list
.next
,
623 struct cor_data_buf_item
, buf_list
);
626 cn_lx
->data_buf
.next_read_offset
= 0;
629 void cor_databuf_pull(struct cor_conn
*cn_lx
, char *dst
, __u32 len
)
631 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
636 char *srcbufcpystart
= 0;
637 int srcbufcpylen
= 0;
639 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
640 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
641 cn_lx
->data_buf
.nextread
->datalen
);
643 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
644 cn_lx
->data_buf
.next_read_offset
;
645 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
646 cn_lx
->data_buf
.next_read_offset
;
648 if (cpy
> srcbufcpylen
)
651 memcpy(dst
, srcbufcpystart
, cpy
);
656 cn_lx
->data_buf
.read_remaining
-= cpy
;
657 cn_lx
->data_buf
.next_read_offset
+= cpy
;
659 if (cpy
== srcbufcpylen
)
660 cor_databuf_nextreadchunk(cn_lx
);
664 void cor_databuf_unpull_dpi(struct cor_conn
*trgt_sock
, struct cor_sock
*cs
,
665 struct cor_data_buf_item
*item
, __u16 next_read_offset
)
667 BUG_ON(next_read_offset
> item
->datalen
);
669 if (next_read_offset
>= item
->datalen
)
672 spin_lock_bh(&(trgt_sock
->rcv_lock
));
674 if (unlikely(cor_is_trgt_sock(trgt_sock
, cs
) == 0)) {
675 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
679 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
680 &(trgt_sock
->data_buf
.nextread
->buf_list
) !=
681 trgt_sock
->data_buf
.items
.next
);
682 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
684 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
685 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
686 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
688 if (item
->type
== DATABUF_BUF
) {
689 trgt_sock
->data_buf
.overhead
+=
690 sizeof(struct cor_data_buf_item
) +
691 item
->buflen
- item
->datalen
;
692 } else if (item
->type
== DATABUF_SKB
) {
693 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
698 list_add(&(item
->buf_list
), &(trgt_sock
->data_buf
.items
));
700 trgt_sock
->data_buf
.nextread
= item
;
701 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
703 cor_account_bufspace(trgt_sock
);
705 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
709 cor_databuf_item_free(item
);
713 void cor_databuf_pull_dbi(struct cor_sock
*cs_rl
, struct cor_conn
*trgt_sock_l
)
715 struct cor_data_buf_item
*dbi
= 0;
716 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
717 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
719 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
722 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
723 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
724 trgt_sock_l
->data_buf
.nextread
->datalen
);
725 dbi
= trgt_sock_l
->data_buf
.nextread
;
727 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
729 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
730 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
732 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
733 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
734 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
736 cor_account_bufspace(trgt_sock_l
);
738 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
739 cor_databuf_nextreadchunk(trgt_sock_l
);
741 cor_databuf_item_unlink(trgt_sock_l
, dbi
);
744 void cor_databuf_unpull(struct cor_conn
*trgt_out_l
, __u32 bytes
)
746 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
748 BUG_ON(list_empty(&(trgt_out_l
->data_buf
.items
)) != 0);
750 if (trgt_out_l
->data_buf
.nextread
== 0) {
751 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
753 trgt_out_l
->data_buf
.nextread
= container_of(
754 trgt_out_l
->data_buf
.items
.prev
,
755 struct cor_data_buf_item
, buf_list
);
758 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
759 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
760 trgt_out_l
->data_buf
.nextread
= container_of(
761 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
762 struct cor_data_buf_item
, buf_list
);
763 BUG_ON(&(trgt_out_l
->data_buf
.nextread
->buf_list
) ==
764 &(trgt_out_l
->data_buf
.items
));
765 trgt_out_l
->data_buf
.next_read_offset
=
766 trgt_out_l
->data_buf
.nextread
->datalen
;
769 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
772 void cor_databuf_pullold(struct cor_conn
*trgt_out_l
, __u64 startpos
, char *dst
,
775 __u64 pos
= trgt_out_l
->data_buf
.first_offset
;
776 struct cor_data_buf_item
*dbi
= container_of(
777 trgt_out_l
->data_buf
.items
.next
,
778 struct cor_data_buf_item
, buf_list
);
781 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
783 if (cor_seqno_after(pos
+ dbi
->datalen
, startpos
))
787 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
794 char *srcbufcpystart
= 0;
795 int srcbufcpylen
= 0;
797 __u64 offset
= cor_seqno_clean(startpos
- pos
);
799 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
801 BUG_ON(cor_seqno_before(startpos
, pos
));
802 BUG_ON(offset
> dbi
->datalen
);
804 srcbufcpystart
= dbi
->buf
+ offset
;
805 srcbufcpylen
= dbi
->datalen
- offset
;
807 if (cpy
> srcbufcpylen
)
810 memcpy(dst
, srcbufcpystart
, cpy
);
817 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
822 /* ack up to *not* including pos */
823 void cor_databuf_ack(struct cor_conn
*trgt_out_l
, __u64 pos
)
827 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
828 struct cor_data_buf_item
*firstitem
= container_of(
829 trgt_out_l
->data_buf
.items
.next
,
830 struct cor_data_buf_item
, buf_list
);
832 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
835 if (cor_seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
836 firstitem
->datalen
, pos
))
839 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
840 acked
+= firstitem
->datalen
;
842 cor_databuf_item_unlink(trgt_out_l
, firstitem
);
843 cor_databuf_item_free(firstitem
);
846 trgt_out_l
->data_buf
.datasize
-= acked
;
848 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
849 trgt_out_l
->data_buf
.overhead
!= 0);
852 cor_account_bufspace(trgt_out_l
);
855 void cor_databuf_ackread(struct cor_conn
*cn_lx
)
859 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
860 struct cor_data_buf_item
*firstitem
= container_of(
861 cn_lx
->data_buf
.items
.next
,
862 struct cor_data_buf_item
, buf_list
);
864 if (firstitem
== cn_lx
->data_buf
.nextread
)
867 acked
+= firstitem
->datalen
;
869 cor_databuf_item_unlink(cn_lx
, firstitem
);
870 cor_databuf_item_free(firstitem
);
873 cn_lx
->data_buf
.datasize
-= acked
;
874 cn_lx
->data_buf
.first_offset
+= acked
;
876 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
879 cor_account_bufspace(cn_lx
);
882 __u32
cor_receive_buf(struct cor_conn
*cn_lx
, char *buf
, __u32 datalen
,
883 int rcv_delayed_lowbuf
, __u8 flush
)
885 struct cor_data_buf_item
*item
= 0;
889 if (list_empty(&(cn_lx
->data_buf
.items
)) == 0) {
890 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
891 item
= container_of(last
, struct cor_data_buf_item
, buf_list
);
894 while (datalen
> 0) {
897 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
898 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
900 if (item
== 0 || item
->type
!= DATABUF_BUF
||
901 item
->buflen
<= item
->datalen
) {
902 item
= kmem_cache_alloc(cor_data_buf_item_slab
,
904 if (unlikely(item
== 0))
907 memset(item
, 0, sizeof(struct cor_data_buf_item
));
908 item
->type
= DATABUF_BUF
;
910 item
->buflen
= cor_buf_optlen(datalen
);
911 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
913 if (unlikely(item
->buf
== 0)) {
914 kmem_cache_free(cor_data_buf_item_slab
, item
);
919 list_add_tail(&(item
->buf_list
),
920 &(cn_lx
->data_buf
.items
));
922 cn_lx
->data_buf
.overhead
+= item
->buflen
+
923 sizeof(struct cor_data_buf_item
);
926 BUG_ON(item
->type
!= DATABUF_BUF
);
927 BUG_ON(item
->buflen
<= item
->datalen
);
929 if (cn_lx
->data_buf
.nextread
== 0) {
930 cn_lx
->data_buf
.nextread
= item
;
931 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
934 if (item
->buflen
- item
->datalen
< cpy
)
935 cpy
= (item
->buflen
- item
->datalen
);
937 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
938 item
->datalen
+= cpy
;
940 BUG_ON(cpy
> datalen
);
945 cn_lx
->data_buf
.read_remaining
+= cpy
;
946 cn_lx
->data_buf
.datasize
+= cpy
;
947 cn_lx
->data_buf
.overhead
-= cpy
;
948 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
949 cn_lx
->data_buf
.overhead
== 0);
954 cn_lx
->flush
= flush
;
956 cor_account_bufspace(cn_lx
);
957 cor_bufsize_update(cn_lx
, totalcpy
, rcv_delayed_lowbuf
, flush
);
962 __u32
cor_receive_skb(struct cor_conn
*src_in_l
, struct sk_buff
*skb
,
963 int rcv_delayed_lowbuf
, __u8 flush
)
965 struct cor_skb_procstate
*ps
= cor_skb_pstate(skb
);
966 struct cor_data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
968 __u32 bufferleft
= 0;
970 BUG_ON(skb
->len
<= 0);
972 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
973 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
977 if (list_empty(&(src_in_l
->data_buf
.items
)) == 0) {
978 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
979 struct cor_data_buf_item
*item
= container_of(last
,
980 struct cor_data_buf_item
, buf_list
);
981 bufferleft
= item
->buflen
- item
->datalen
;
984 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
985 __u32 rc
= cor_receive_buf(src_in_l
, skb
->data
, skb
->len
,
986 rcv_delayed_lowbuf
, flush
);
987 if (likely(rc
== skb
->len
))
992 memset(item
, 0, sizeof(struct cor_data_buf_item
));
994 item
->type
= DATABUF_SKB
;
995 item
->buf
= skb
->data
;
996 item
->datalen
= skb
->len
;
997 item
->buflen
= item
->datalen
;
998 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
999 if (src_in_l
->data_buf
.nextread
== 0)
1000 src_in_l
->data_buf
.nextread
= item
;
1002 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
1003 src_in_l
->data_buf
.datasize
+= item
->datalen
;
1004 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
1006 cor_account_bufspace(src_in_l
);
1007 cor_bufsize_update(src_in_l
, skb
->len
, rcv_delayed_lowbuf
, flush
);
1009 src_in_l
->flush
= flush
;
1014 void cor_wake_sender(struct cor_conn
*cn
)
1016 spin_lock_bh(&(cn
->rcv_lock
));
1018 if (unlikely(cn
->isreset
)) {
1019 spin_unlock_bh(&(cn
->rcv_lock
));
1023 switch (cn
->sourcetype
) {
1024 case SOURCE_UNCONNECTED
:
1025 spin_unlock_bh(&(cn
->rcv_lock
));
1026 spin_lock_bh(&(cn
->reversedir
->rcv_lock
));
1027 if (likely(cn
->reversedir
->isreset
== 0 &&
1028 cn
->reversedir
->targettype
==
1029 TARGET_UNCONNECTED
))
1030 cor_proc_cpacket(cn
->reversedir
);
1031 spin_unlock_bh(&(cn
->reversedir
->rcv_lock
));
1034 if (_cor_mngdsocket_flushtoconn(cn
) == RC_FTC_OK
&&
1035 cn
->source
.sock
.cs
!= 0 &&
1036 cor_sock_sndbufavailable(cn
, 1))
1037 cor_sk_write_space(cn
->source
.sock
.cs
);
1038 spin_unlock_bh(&(cn
->rcv_lock
));
1041 cor_drain_ooo_queue(cn
);
1042 if (likely(cn
->source
.in
.established
!= 0)) {
1043 cor_send_ack_conn_ifneeded(cn
, 0, 0);
1045 spin_unlock_bh(&(cn
->rcv_lock
));
1052 int __init
cor_forward_init(void)
1054 cor_data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
1055 sizeof(struct cor_data_buf_item
), 8, 0, 0);
1056 if (unlikely(cor_data_buf_item_slab
== 0))
1059 atomic64_set(&cor_bufused_sum
, 0);
1064 void __exit
cor_forward_exit2(void)
1066 BUG_ON(atomic64_read(&cor_bufused_sum
) != 0);
1068 kmem_cache_destroy(cor_data_buf_item_slab
);
1069 cor_data_buf_item_slab
= 0;
1072 MODULE_LICENSE("GPL");