2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <linux/mutex.h>
25 struct kmem_cache
*cor_data_buf_item_slab
;
27 atomic64_t cor_bufused_sum
;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&cor_bufused_sum);
34 void cor_databuf_init(struct cor_conn
*cn_init
)
36 memset(&(cn_init
->data_buf
), 0, sizeof(cn_init
->data_buf
));
37 INIT_LIST_HEAD(&(cn_init
->data_buf
.items
));
40 void cor_bufsize_init(struct cor_conn
*cn_l
, __u32 bufsize
)
42 __u32 bufsize_shifted
;
44 memset(&(cn_l
->bufsize
), 0, sizeof(cn_l
->bufsize
));
46 if (unlikely((bufsize
>> (32 - BUFSIZE_SHIFT
)) != 0))
47 bufsize_shifted
= U32_MAX
;
49 bufsize_shifted
= bufsize
<< 5;
51 cn_l
->bufsize
.bufsize
= bufsize_shifted
;
52 cn_l
->bufsize
.state
= BUFSIZE_NOACTION
;
53 cn_l
->bufsize
.act
.noact
.bytesleft
= bufsize
* 4;
56 #warning todo reset stalled conns on low memory
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int cor_account_bufspace(struct cor_conn
*cn_lx
)
63 __u64 space_needed
= 0;
65 __u64 bufused_sum_int
;
67 if (likely(cn_lx
->isreset
== 0)) {
68 space_needed
+= cn_lx
->data_buf
.datasize
;
69 space_needed
+= cn_lx
->data_buf
.overhead
;
70 if (cn_lx
->sourcetype
== SOURCE_IN
) {
71 space_needed
+= cn_lx
->source
.in
.reorder_memused
;
75 if (cn_lx
->bufspace_accounted
== space_needed
) {
76 if (space_needed
>= BUFUSAGE_PER_CONN_MAX
) {
78 } else if (atomic64_read(&cor_bufused_sum
) >=
79 BUFUSAGE_GLOBAL_MAX
) {
86 if (unlikely(space_needed
>= U32_MAX
))
89 space_req
= space_needed
;
91 if (cn_lx
->bufspace_accounted
== space_req
)
94 bufused_sum_int
= cor_update_atomic_sum(&cor_bufused_sum
,
95 cn_lx
->bufspace_accounted
, space_req
);
97 cn_lx
->bufspace_accounted
= space_req
;
99 if (space_needed
!= space_req
)
101 else if (bufused_sum_int
>= BUFUSAGE_GLOBAL_MAX
)
103 else if (space_needed
>= BUFUSAGE_PER_CONN_MAX
)
109 int cor_cpacket_write_allowed(struct cor_conn
*src_unconn_lx
)
111 BUG_ON(src_unconn_lx
->sourcetype
!= SOURCE_UNCONNECTED
);
113 if (src_unconn_lx
->data_buf
.datasize
== 0)
115 else if (src_unconn_lx
->data_buf
.datasize
< BUFFERLIMIT_CPACKETS
&&
116 cor_account_bufspace(src_unconn_lx
) == 0)
122 void cor_update_windowlimit(struct cor_conn
*src_in_lx
)
126 BUG_ON(src_in_lx
->sourcetype
!= SOURCE_IN
);
128 bufsize
= src_in_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
130 if (src_in_lx
->targettype
!= TARGET_OUT
) {
131 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
132 cor_account_bufspace(src_in_lx
)) {
133 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
135 } else if (cor_seqno_before_eq(src_in_lx
->target
.out
.seqno_windowlimit
,
136 src_in_lx
->target
.out
.seqno_nextsend
)) {
137 if (cor_account_bufspace(src_in_lx
)) {
138 bufsize
= min(bufsize
, (__u32
) WINDOW_MAX_PER_CONN_MIN
);
141 __u32 windowleft
= (__u32
) min((__u64
) U32_MAX
,
143 src_in_lx
->target
.out
.seqno_windowlimit
-
144 src_in_lx
->target
.out
.seqno_nextsend
));
146 bufsize
= max(bufsize
, min(windowleft
,
147 (__u32
) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK
));
149 if (bufsize
< WINDOW_MAX_PER_CONN_MIN
||
150 cor_account_bufspace(src_in_lx
)) {
151 bufsize
= WINDOW_MAX_PER_CONN_MIN
;
155 if (bufsize
> WINDOW_MAX_PER_CONN_MAX
)
156 bufsize
= WINDOW_MAX_PER_CONN_MAX
;
158 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
160 if (unlikely(src_in_lx
->data_buf
.read_remaining
> bufsize
))
163 bufsize
-= src_in_lx
->data_buf
.read_remaining
;
165 if (unlikely(src_in_lx
->targettype
== TARGET_DISCARD
))
168 src_in_lx
->source
.in
.window_seqnolimit
=
169 src_in_lx
->source
.in
.next_seqno
+ bufsize
;
172 static int cor_bufsize_high_latency_sender(struct cor_conn
*cn_lx
)
174 struct cor_neighbor
*nb
;
178 if (cn_lx
->sourcetype
!= SOURCE_IN
)
181 nb
= cn_lx
->source
.in
.nb
;
182 if (unlikely(nb
== 0))
185 latency_us
= atomic_read(&(nb
->latency_retrans_us
));
186 latency_us
+= atomic_read(&(nb
->latency_stddev_retrans_us
));
187 latency_us
+= CMSG_MAXDELAY_ACKCONN_MS
* 1000;
189 return latency_us
> 100000 ? 1 : 0;
192 static void _cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
193 int high_latency_sender
, int high_latency_conn
)
195 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
196 BUG_ON(BUFSIZE_SHIFT
!= 5);
198 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
) {
199 if (likely(cn_lx
->bufsize
.act
.noact
.bytesleft
>= rcvd
)) {
200 cn_lx
->bufsize
.act
.noact
.bytesleft
-= rcvd
;
204 rcvd
-= cn_lx
->bufsize
.act
.noact
.bytesleft
;
205 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
206 cn_lx
->bufsize
.act
.decr
.size_start
= cn_lx
->bufsize
.bufsize
;
209 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
210 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
214 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
)
217 if (high_latency_sender
)
220 change
= rcvd
* speed
;
221 if (high_latency_conn
)
224 if (cn_lx
->bufsize
.bufsize
< change
)
225 cn_lx
->bufsize
.bufsize
= 0;
227 cn_lx
->bufsize
.bufsize
-= change
;
229 if (cn_lx
->bufsize
.act
.decr
.size_start
/4 >
230 cn_lx
->bufsize
.bufsize
)
231 cn_lx
->bufsize
.state
= BUFSIZE_DECR_FAST
;
232 } else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR
||
233 cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
) {
237 if (high_latency_sender
)
240 if (cn_lx
->bufsize
.bytes_rcvd
!= (1 << 24) - 1 &&
241 cn_lx
->bufsize
.bytes_rcvd
<
242 cn_lx
->bufsize
.bufsize
)
244 else if (cn_lx
->bufsize
.state
== BUFSIZE_INCR_FAST
)
247 change
= rcvd
* speed
;
248 if (cn_lx
->bufsize
.bufsize
+ change
< cn_lx
->bufsize
.bufsize
)
249 cn_lx
->bufsize
.bufsize
= U32_MAX
;
251 cn_lx
->bufsize
.bufsize
+= change
;
253 if (cn_lx
->bufsize
.bufsize
>=
254 cn_lx
->bufsize
.act
.incr
.size_end
) {
255 cn_lx
->bufsize
.bufsize
=
256 cn_lx
->bufsize
.act
.incr
.size_end
;
257 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
258 if (high_latency_conn
) {
259 cn_lx
->bufsize
.act
.noact
.bytesleft
=
260 (cn_lx
->bufsize
.bufsize
>>
263 cn_lx
->bufsize
.act
.noact
.bytesleft
=
264 (cn_lx
->bufsize
.bufsize
>>
272 if (unlikely(rcvd
>= (1 << 24)) ||
273 cn_lx
->bufsize
.bytes_rcvd
+ rcvd
>= (1 << 24))
274 cn_lx
->bufsize
.bytes_rcvd
= (1 << 24) - 1;
276 cn_lx
->bufsize
.bytes_rcvd
+= rcvd
;
279 static __u32
cor_get_read_remaining_min(__u32 bufsize_bytes
,
280 int high_latency_sender
, int high_latency_conn
)
282 int bufspace_low
= (atomic64_read(&cor_bufused_sum
) >=
283 3*(BUFUSAGE_GLOBAL_MAX
/4));
285 if (high_latency_conn
) {
286 if (high_latency_sender
) {
288 return bufsize_bytes
/6 + 1;
290 return bufsize_bytes
/3 + 1;
294 return bufsize_bytes
/8 + 1;
296 return bufsize_bytes
/4 + 1;
300 if (high_latency_sender
) {
302 return bufsize_bytes
/6 + 1;
304 return bufsize_bytes
/4 + 1;
308 return bufsize_bytes
/12 + 1;
310 return bufsize_bytes
/8 + 1;
316 static void cor_bufsize_update(struct cor_conn
*cn_lx
, __u32 rcvd
,
317 int rcv_delayed_lowbuf
, __u8 rcv_flushrcvd
)
319 int high_latency_sender
= cor_bufsize_high_latency_sender(cn_lx
);
320 int high_latency_conn
= (cn_lx
->is_highlatency
!= 0);
321 __u32 bufsize_bytes
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
322 __u32 read_remaining_min
= cor_get_read_remaining_min(bufsize_bytes
,
323 high_latency_sender
, high_latency_conn
);
324 __u32 read_remaining_min_nofastdecr
= read_remaining_min
*2;
325 __u32 read_remaining_min_nodecr
= (read_remaining_min
+
326 read_remaining_min_nofastdecr
)/2;
327 __u32 read_remaining
= cn_lx
->data_buf
.read_remaining
- rcvd
;
329 BUG_ON(cn_lx
->data_buf
.read_remaining
< rcvd
);
331 if (rcv_flushrcvd
!= 0) {
332 __u32 bytesleft
= (cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
);
333 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
334 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
335 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
336 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
337 } else if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
&&
338 cn_lx
->bufsize
.act
.noact
.bytesleft
<
340 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
344 if (cn_lx
->bufsize
.ignore_rcv_lowbuf
> 0) {
345 rcv_delayed_lowbuf
= 0;
346 if (rcvd
> cn_lx
->bufsize
.ignore_rcv_lowbuf
)
347 cn_lx
->bufsize
.ignore_rcv_lowbuf
= 0;
349 cn_lx
->bufsize
.ignore_rcv_lowbuf
-= rcvd
;
352 if (rcv_delayed_lowbuf
&& read_remaining
< read_remaining_min
) {
353 __u32 buf_increase_bytes
= read_remaining_min
- read_remaining
;
357 if (high_latency_sender
) {
358 if (buf_increase_bytes
< rcvd
/16)
359 buf_increase_bytes
= rcvd
/16;
361 if (buf_increase_bytes
< rcvd
/32)
362 buf_increase_bytes
= rcvd
/32;
365 buf_increase
= (buf_increase_bytes
<< BUFSIZE_SHIFT
);
366 if (unlikely((buf_increase
>> BUFSIZE_SHIFT
) !=
368 buf_increase
= U32_MAX
;
370 bufsize_end
= cn_lx
->bufsize
.bufsize
+ buf_increase
;
371 if (unlikely(bufsize_end
< cn_lx
->bufsize
.bufsize
))
372 bufsize_end
= U32_MAX
;
374 if (cn_lx
->bufsize
.state
!= BUFSIZE_INCR
&&
375 cn_lx
->bufsize
.state
!= BUFSIZE_INCR_FAST
) {
376 cn_lx
->bufsize
.state
= BUFSIZE_INCR
;
377 cn_lx
->bufsize
.act
.incr
.size_start
=
378 cn_lx
->bufsize
.bufsize
;
379 cn_lx
->bufsize
.act
.incr
.size_end
= 0;
382 if (bufsize_end
> cn_lx
->bufsize
.act
.incr
.size_end
)
383 cn_lx
->bufsize
.act
.incr
.size_end
= bufsize_end
;
384 if (bufsize_end
/4 > cn_lx
->bufsize
.act
.incr
.size_start
)
385 cn_lx
->bufsize
.state
= BUFSIZE_INCR_FAST
;
386 } else if (rcv_delayed_lowbuf
&&
387 read_remaining
< read_remaining_min_nodecr
) {
388 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
||
389 cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
390 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
393 if (cn_lx
->bufsize
.state
== BUFSIZE_NOACTION
)
394 bytesleft
= cn_lx
->bufsize
.act
.noact
.bytesleft
;
395 if (high_latency_conn
)
397 if (likely((cn_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
) *
398 rtt_mul
> bytesleft
))
399 bytesleft
= (cn_lx
->bufsize
.bufsize
>>
400 BUFSIZE_SHIFT
) * rtt_mul
;
402 cn_lx
->bufsize
.state
= BUFSIZE_NOACTION
;
403 cn_lx
->bufsize
.act
.noact
.bytesleft
= bytesleft
;
405 } else if (rcv_delayed_lowbuf
&&
406 read_remaining
< read_remaining_min_nofastdecr
) {
407 if (cn_lx
->bufsize
.state
== BUFSIZE_DECR
||
408 cn_lx
->bufsize
.state
== BUFSIZE_DECR_FAST
) {
409 cn_lx
->bufsize
.state
= BUFSIZE_DECR
;
410 cn_lx
->bufsize
.act
.decr
.size_start
=
411 cn_lx
->bufsize
.bufsize
;
415 _cor_bufsize_update(cn_lx
, rcvd
, high_latency_sender
,
419 void cor_bufsize_read_to_sock(struct cor_conn
*trgt_sock_lx
)
421 unsigned long jiffies_tmp
= jiffies
;
422 __u32 latency_limit
= (trgt_sock_lx
->is_highlatency
!= 0 ?
425 if (trgt_sock_lx
->target
.sock
.waiting_for_userspace
!= 0 && time_before(
426 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
,
427 jiffies
- latency_limit
)) {
428 trgt_sock_lx
->bufsize
.ignore_rcv_lowbuf
=
429 trgt_sock_lx
->bufsize
.bufsize
>> BUFSIZE_SHIFT
;
432 if (trgt_sock_lx
->data_buf
.read_remaining
== 0) {
433 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 0;
435 trgt_sock_lx
->target
.sock
.waiting_for_userspace
= 1;
436 trgt_sock_lx
->target
.sock
.waiting_for_userspace_since
=
441 static inline void cor_databuf_item_unlink(struct cor_conn
*cn_lx
,
442 struct cor_data_buf_item
*item
)
444 BUG_ON(item
== cn_lx
->data_buf
.nextread
);
445 list_del(&(item
->buf_list
));
446 if (item
->type
== DATABUF_BUF
) {
447 cn_lx
->data_buf
.overhead
-= sizeof(struct cor_data_buf_item
) +
448 item
->buflen
- item
->datalen
;
449 } else if (item
->type
== DATABUF_SKB
) {
450 cn_lx
->data_buf
.overhead
-= sizeof(struct sk_buff
);
456 void cor_databuf_ackdiscard(struct cor_conn
*cn_lx
)
460 cn_lx
->data_buf
.next_read_offset
= 0;
461 cn_lx
->data_buf
.nextread
= 0;
463 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
464 struct cor_data_buf_item
*item
= container_of(
465 cn_lx
->data_buf
.items
.next
,
466 struct cor_data_buf_item
, buf_list
);
467 freed
+= item
->datalen
;
469 cor_databuf_item_unlink(cn_lx
, item
);
470 cor_databuf_item_free(item
);
473 cn_lx
->data_buf
.datasize
-= freed
;
474 cn_lx
->data_buf
.first_offset
+= freed
;
476 BUG_ON(cn_lx
->data_buf
.datasize
!= 0);
477 BUG_ON(cn_lx
->data_buf
.overhead
!= 0);
479 cn_lx
->data_buf
.read_remaining
= 0;
482 void cor_reset_seqno(struct cor_conn
*cn_l
, __u64 initseqno
)
484 cn_l
->data_buf
.first_offset
= initseqno
-
485 cn_l
->data_buf
.datasize
+
486 cn_l
->data_buf
.read_remaining
;
489 static void cor_databuf_nextreadchunk(struct cor_conn
*cn_lx
)
491 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
492 BUG_ON(cn_lx
->data_buf
.next_read_offset
!=
493 cn_lx
->data_buf
.nextread
->datalen
);
495 if (&(cn_lx
->data_buf
.nextread
->buf_list
) ==
496 cn_lx
->data_buf
.items
.prev
) {
497 BUG_ON(cn_lx
->data_buf
.read_remaining
!= 0);
498 cn_lx
->data_buf
.nextread
= 0;
501 BUG_ON(cn_lx
->data_buf
.read_remaining
== 0);
502 cn_lx
->data_buf
.nextread
= container_of(
503 cn_lx
->data_buf
.nextread
->buf_list
.next
,
504 struct cor_data_buf_item
, buf_list
);
507 cn_lx
->data_buf
.next_read_offset
= 0;
510 void cor_databuf_pull(struct cor_conn
*cn_lx
, char *dst
, __u32 len
)
512 BUG_ON(cn_lx
->data_buf
.read_remaining
< len
);
517 char *srcbufcpystart
= 0;
518 int srcbufcpylen
= 0;
520 BUG_ON(cn_lx
->data_buf
.nextread
== 0);
521 BUG_ON(cn_lx
->data_buf
.next_read_offset
>=
522 cn_lx
->data_buf
.nextread
->datalen
);
524 srcbufcpystart
= cn_lx
->data_buf
.nextread
->buf
+
525 cn_lx
->data_buf
.next_read_offset
;
526 srcbufcpylen
= cn_lx
->data_buf
.nextread
->datalen
-
527 cn_lx
->data_buf
.next_read_offset
;
529 if (cpy
> srcbufcpylen
)
532 memcpy(dst
, srcbufcpystart
, cpy
);
537 cn_lx
->data_buf
.read_remaining
-= cpy
;
538 cn_lx
->data_buf
.next_read_offset
+= cpy
;
540 if (cpy
== srcbufcpylen
)
541 cor_databuf_nextreadchunk(cn_lx
);
545 void cor_databuf_unpull_dpi(struct cor_conn
*trgt_sock
, struct cor_sock
*cs
,
546 struct cor_data_buf_item
*item
, __u16 next_read_offset
)
548 BUG_ON(next_read_offset
> item
->datalen
);
550 if (next_read_offset
>= item
->datalen
)
553 spin_lock_bh(&(trgt_sock
->rcv_lock
));
555 if (unlikely(cor_is_trgt_sock(trgt_sock
, cs
) == 0)) {
556 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
560 BUG_ON(trgt_sock
->data_buf
.nextread
!= 0 &&
561 &(trgt_sock
->data_buf
.nextread
->buf_list
) !=
562 trgt_sock
->data_buf
.items
.next
);
563 BUG_ON(trgt_sock
->data_buf
.next_read_offset
!= 0);
565 trgt_sock
->data_buf
.first_offset
-= item
->datalen
;
566 trgt_sock
->data_buf
.datasize
+= item
->datalen
;
567 trgt_sock
->data_buf
.read_remaining
+= item
->datalen
- next_read_offset
;
569 if (item
->type
== DATABUF_BUF
) {
570 trgt_sock
->data_buf
.overhead
+=
571 sizeof(struct cor_data_buf_item
) +
572 item
->buflen
- item
->datalen
;
573 } else if (item
->type
== DATABUF_SKB
) {
574 trgt_sock
->data_buf
.overhead
+= sizeof(struct sk_buff
);
579 list_add(&(item
->buf_list
), &(trgt_sock
->data_buf
.items
));
581 trgt_sock
->data_buf
.nextread
= item
;
582 trgt_sock
->data_buf
.next_read_offset
= next_read_offset
;
584 cor_account_bufspace(trgt_sock
);
586 spin_unlock_bh(&(trgt_sock
->rcv_lock
));
590 cor_databuf_item_free(item
);
594 void cor_databuf_pull_dbi(struct cor_sock
*cs_rl
, struct cor_conn
*trgt_sock_l
)
596 struct cor_data_buf_item
*dbi
= 0;
597 BUG_ON(cs_rl
->type
!= CS_TYPE_CONN_RAW
);
598 BUG_ON(cs_rl
->data
.conn_raw
.rcvitem
!= 0);
600 if (trgt_sock_l
->data_buf
.read_remaining
== 0)
603 BUG_ON(trgt_sock_l
->data_buf
.nextread
== 0);
604 BUG_ON(trgt_sock_l
->data_buf
.next_read_offset
>=
605 trgt_sock_l
->data_buf
.nextread
->datalen
);
606 dbi
= trgt_sock_l
->data_buf
.nextread
;
608 BUG_ON(&(dbi
->buf_list
) != trgt_sock_l
->data_buf
.items
.next
);
610 cs_rl
->data
.conn_raw
.rcvitem
= dbi
;
611 cs_rl
->data
.conn_raw
.rcvoffset
= trgt_sock_l
->data_buf
.next_read_offset
;
613 trgt_sock_l
->data_buf
.first_offset
+= dbi
->datalen
;
614 trgt_sock_l
->data_buf
.datasize
-= dbi
->datalen
;
615 trgt_sock_l
->data_buf
.read_remaining
-= dbi
->datalen
;
617 cor_account_bufspace(trgt_sock_l
);
619 trgt_sock_l
->data_buf
.next_read_offset
= dbi
->datalen
;
620 cor_databuf_nextreadchunk(trgt_sock_l
);
622 cor_databuf_item_unlink(trgt_sock_l
, dbi
);
625 void cor_databuf_unpull(struct cor_conn
*trgt_out_l
, __u32 bytes
)
627 trgt_out_l
->data_buf
.read_remaining
+= bytes
;
629 BUG_ON(list_empty(&(trgt_out_l
->data_buf
.items
)) != 0);
631 if (trgt_out_l
->data_buf
.nextread
== 0) {
632 BUG_ON(trgt_out_l
->data_buf
.next_read_offset
!= 0);
634 trgt_out_l
->data_buf
.nextread
= container_of(
635 trgt_out_l
->data_buf
.items
.prev
,
636 struct cor_data_buf_item
, buf_list
);
639 while (bytes
> trgt_out_l
->data_buf
.next_read_offset
) {
640 bytes
-= trgt_out_l
->data_buf
.next_read_offset
;
641 trgt_out_l
->data_buf
.nextread
= container_of(
642 trgt_out_l
->data_buf
.nextread
->buf_list
.prev
,
643 struct cor_data_buf_item
, buf_list
);
644 BUG_ON(&(trgt_out_l
->data_buf
.nextread
->buf_list
) ==
645 &(trgt_out_l
->data_buf
.items
));
646 trgt_out_l
->data_buf
.next_read_offset
=
647 trgt_out_l
->data_buf
.nextread
->datalen
;
650 trgt_out_l
->data_buf
.next_read_offset
-= bytes
;
653 void cor_databuf_pullold(struct cor_conn
*trgt_out_l
, __u64 startpos
, char *dst
,
656 __u64 pos
= trgt_out_l
->data_buf
.first_offset
;
657 struct cor_data_buf_item
*dbi
= container_of(
658 trgt_out_l
->data_buf
.items
.next
,
659 struct cor_data_buf_item
, buf_list
);
662 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
664 if (cor_seqno_after(pos
+ dbi
->datalen
, startpos
))
668 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
675 char *srcbufcpystart
= 0;
676 int srcbufcpylen
= 0;
678 __u64 offset
= cor_seqno_clean(startpos
- pos
);
680 BUG_ON(&(dbi
->buf_list
) == &(trgt_out_l
->data_buf
.items
));
682 BUG_ON(cor_seqno_before(startpos
, pos
));
683 BUG_ON(offset
> dbi
->datalen
);
685 srcbufcpystart
= dbi
->buf
+ offset
;
686 srcbufcpylen
= dbi
->datalen
- offset
;
688 if (cpy
> srcbufcpylen
)
691 memcpy(dst
, srcbufcpystart
, cpy
);
698 dbi
= container_of(dbi
->buf_list
.next
, struct cor_data_buf_item
,
703 /* ack up to *not* including pos */
704 void cor_databuf_ack(struct cor_conn
*trgt_out_l
, __u64 pos
)
708 while (!list_empty(&(trgt_out_l
->data_buf
.items
))) {
709 struct cor_data_buf_item
*firstitem
= container_of(
710 trgt_out_l
->data_buf
.items
.next
,
711 struct cor_data_buf_item
, buf_list
);
713 if (firstitem
== trgt_out_l
->data_buf
.nextread
)
716 if (cor_seqno_after_eq(trgt_out_l
->data_buf
.first_offset
+
717 firstitem
->datalen
, pos
))
720 trgt_out_l
->data_buf
.first_offset
+= firstitem
->datalen
;
721 acked
+= firstitem
->datalen
;
723 cor_databuf_item_unlink(trgt_out_l
, firstitem
);
724 cor_databuf_item_free(firstitem
);
727 trgt_out_l
->data_buf
.datasize
-= acked
;
729 BUG_ON(trgt_out_l
->data_buf
.datasize
== 0 &&
730 trgt_out_l
->data_buf
.overhead
!= 0);
733 cor_account_bufspace(trgt_out_l
);
736 void cor_databuf_ackread(struct cor_conn
*cn_lx
)
740 while (!list_empty(&(cn_lx
->data_buf
.items
))) {
741 struct cor_data_buf_item
*firstitem
= container_of(
742 cn_lx
->data_buf
.items
.next
,
743 struct cor_data_buf_item
, buf_list
);
745 if (firstitem
== cn_lx
->data_buf
.nextread
)
748 acked
+= firstitem
->datalen
;
750 cor_databuf_item_unlink(cn_lx
, firstitem
);
751 cor_databuf_item_free(firstitem
);
754 cn_lx
->data_buf
.datasize
-= acked
;
755 cn_lx
->data_buf
.first_offset
+= acked
;
757 BUG_ON(cn_lx
->data_buf
.datasize
== 0 && cn_lx
->data_buf
.overhead
!= 0);
760 cor_account_bufspace(cn_lx
);
763 __u32
cor_receive_buf(struct cor_conn
*cn_lx
, char *buf
, __u32 datalen
,
764 int rcv_delayed_lowbuf
, __u8 flush
)
766 struct cor_data_buf_item
*item
= 0;
770 if (list_empty(&(cn_lx
->data_buf
.items
)) == 0) {
771 struct list_head
*last
= cn_lx
->data_buf
.items
.prev
;
772 item
= container_of(last
, struct cor_data_buf_item
, buf_list
);
775 while (datalen
> 0) {
778 BUG_ON(cn_lx
->data_buf
.datasize
+ datalen
> (1 << 30));
779 BUG_ON(cn_lx
->data_buf
.overhead
> (1 << 30));
781 if (item
== 0 || item
->type
!= DATABUF_BUF
||
782 item
->buflen
<= item
->datalen
) {
783 item
= kmem_cache_alloc(cor_data_buf_item_slab
,
785 if (unlikely(item
== 0))
788 memset(item
, 0, sizeof(struct cor_data_buf_item
));
789 item
->type
= DATABUF_BUF
;
791 item
->buflen
= cor_buf_optlen(datalen
);
792 item
->buf
= kmalloc(item
->buflen
, GFP_ATOMIC
);
794 if (unlikely(item
->buf
== 0)) {
795 kmem_cache_free(cor_data_buf_item_slab
, item
);
800 list_add_tail(&(item
->buf_list
),
801 &(cn_lx
->data_buf
.items
));
803 cn_lx
->data_buf
.overhead
+= item
->buflen
+
804 sizeof(struct cor_data_buf_item
);
807 BUG_ON(item
->type
!= DATABUF_BUF
);
808 BUG_ON(item
->buflen
<= item
->datalen
);
810 if (cn_lx
->data_buf
.nextread
== 0) {
811 cn_lx
->data_buf
.nextread
= item
;
812 cn_lx
->data_buf
.next_read_offset
= item
->datalen
;
815 if (item
->buflen
- item
->datalen
< cpy
)
816 cpy
= (item
->buflen
- item
->datalen
);
818 memcpy(item
->buf
+ item
->datalen
, buf
, cpy
);
819 item
->datalen
+= cpy
;
821 BUG_ON(cpy
> datalen
);
826 cn_lx
->data_buf
.read_remaining
+= cpy
;
827 cn_lx
->data_buf
.datasize
+= cpy
;
828 cn_lx
->data_buf
.overhead
-= cpy
;
829 BUG_ON(cn_lx
->data_buf
.datasize
!= 0 &&
830 cn_lx
->data_buf
.overhead
== 0);
835 cn_lx
->flush
= flush
;
837 cor_account_bufspace(cn_lx
);
838 cor_bufsize_update(cn_lx
, totalcpy
, rcv_delayed_lowbuf
, flush
);
843 __u32
cor_receive_skb(struct cor_conn
*src_in_l
, struct sk_buff
*skb
,
844 int rcv_delayed_lowbuf
, __u8 flush
)
846 struct cor_skb_procstate
*ps
= cor_skb_pstate(skb
);
847 struct cor_data_buf_item
*item
= &(ps
->funcstate
.rcv
.dbi
);
849 __u32 bufferleft
= 0;
851 BUG_ON(skb
->len
<= 0);
853 if (unlikely(unlikely(src_in_l
->data_buf
.datasize
+ skb
->len
>
854 (1 << 30)) || unlikely(src_in_l
->data_buf
.overhead
>
858 if (list_empty(&(src_in_l
->data_buf
.items
)) == 0) {
859 struct list_head
*last
= src_in_l
->data_buf
.items
.prev
;
860 struct cor_data_buf_item
*item
= container_of(last
,
861 struct cor_data_buf_item
, buf_list
);
862 bufferleft
= item
->buflen
- item
->datalen
;
865 if (skb
->len
< (sizeof(struct sk_buff
) + bufferleft
)) {
866 __u32 rc
= cor_receive_buf(src_in_l
, skb
->data
, skb
->len
,
867 rcv_delayed_lowbuf
, flush
);
868 if (likely(rc
== skb
->len
))
873 memset(item
, 0, sizeof(struct cor_data_buf_item
));
875 item
->type
= DATABUF_SKB
;
876 item
->buf
= skb
->data
;
877 item
->datalen
= skb
->len
;
878 item
->buflen
= item
->datalen
;
879 list_add_tail(&(item
->buf_list
), &(src_in_l
->data_buf
.items
));
880 if (src_in_l
->data_buf
.nextread
== 0)
881 src_in_l
->data_buf
.nextread
= item
;
883 src_in_l
->data_buf
.read_remaining
+= item
->datalen
;
884 src_in_l
->data_buf
.datasize
+= item
->datalen
;
885 src_in_l
->data_buf
.overhead
+= sizeof(struct sk_buff
);
887 cor_account_bufspace(src_in_l
);
888 cor_bufsize_update(src_in_l
, skb
->len
, rcv_delayed_lowbuf
, flush
);
890 src_in_l
->flush
= flush
;
895 void cor_wake_sender(struct cor_conn
*cn
)
897 spin_lock_bh(&(cn
->rcv_lock
));
899 if (unlikely(cn
->isreset
)) {
900 spin_unlock_bh(&(cn
->rcv_lock
));
904 switch (cn
->sourcetype
) {
905 case SOURCE_UNCONNECTED
:
906 spin_unlock_bh(&(cn
->rcv_lock
));
907 spin_lock_bh(&(cn
->reversedir
->rcv_lock
));
908 if (likely(cn
->reversedir
->isreset
== 0 &&
909 cn
->reversedir
->targettype
==
911 cor_proc_cpacket(cn
->reversedir
);
912 spin_unlock_bh(&(cn
->reversedir
->rcv_lock
));
915 if (_cor_mngdsocket_flushtoconn(cn
) == RC_FTC_OK
&&
916 cn
->source
.sock
.cs
!= 0 /* &&
917 cor_sock_sndbufavailable(cn) */)
918 cor_sk_write_space(cn
->source
.sock
.cs
);
919 spin_unlock_bh(&(cn
->rcv_lock
));
922 cor_drain_ooo_queue(cn
);
923 if (likely(cn
->source
.in
.established
!= 0)) {
924 cor_send_ack_conn_ifneeded(cn
, 0, 0);
926 spin_unlock_bh(&(cn
->rcv_lock
));
933 int __init
cor_forward_init(void)
935 cor_data_buf_item_slab
= kmem_cache_create("cor_data_buf_item",
936 sizeof(struct cor_data_buf_item
), 8, 0, 0);
937 if (unlikely(cor_data_buf_item_slab
== 0))
940 atomic64_set(&cor_bufused_sum
, 0);
945 void __exit
cor_forward_exit2(void)
947 BUG_ON(atomic64_read(&cor_bufused_sum
) != 0);
949 kmem_cache_destroy(cor_data_buf_item_slab
);
950 cor_data_buf_item_slab
= 0;
953 MODULE_LICENSE("GPL");