shrink cor_conn.source.sock
[cor.git] / net / cor / conn_databuf.c
blob6d4ebbdf8c3902d098367e629f6db9bcd54dafe9
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *cor_data_buf_item_slab;
27 atomic64_t cor_bufused_sum;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&cor_bufused_sum);
32 } */
34 void cor_databuf_init(struct cor_conn *cn_init)
36 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
37 INIT_LIST_HEAD(&(cn_init->data_buf.items));
40 void cor_bufsize_init(struct cor_conn *cn_l, __u32 bufsize)
42 __u32 bufsize_shifted;
44 memset(&(cn_l->bufsize), 0, sizeof(cn_l->bufsize));
46 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
47 bufsize_shifted = U32_MAX;
48 else
49 bufsize_shifted = bufsize << 5;
51 cn_l->bufsize.bufsize = bufsize_shifted;
52 cn_l->bufsize.state = BUFSIZE_NOACTION;
53 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
56 int cor_account_bufspace(struct cor_conn *cn_lx)
58 __u64 space_needed = 0;
59 __u32 space_req;
60 __u64 bufused_sum_int;
62 if (likely(cn_lx->isreset == 0)) {
63 space_needed += cn_lx->data_buf.datasize;
64 space_needed += cn_lx->data_buf.overhead;
65 if (cn_lx->sourcetype == SOURCE_IN) {
66 space_needed += cn_lx->source.in.reorder_memused;
70 if (cn_lx->bufspace_accounted == space_needed) {
71 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
72 return 1;
73 } else if (atomic64_read(&cor_bufused_sum) >=
74 BUFUSAGE_GLOBAL_MAX) {
75 return 1;
76 } else {
77 return 0;
81 if (unlikely(space_needed >= U32_MAX))
82 space_req = U32_MAX;
83 else
84 space_req = space_needed;
86 if (cn_lx->bufspace_accounted == space_req)
87 return 1;
89 bufused_sum_int = cor_update_atomic_sum(&cor_bufused_sum,
90 cn_lx->bufspace_accounted, space_req);
92 cn_lx->bufspace_accounted = space_req;
94 if (cn_lx->targettype == TARGET_OUT && unlikely(
95 unlikely(cn_lx->target.out.in_nb_busy_list == 0) !=
96 unlikely(cn_lx->data_buf.datasize == 0)))
97 cor_conn_set_last_act(cn_lx);
99 if (space_needed != space_req)
100 return 1;
101 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
102 return 1;
103 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
104 return 1;
105 else
106 return 0;
109 int cor_conn_src_unconn_write_allowed(struct cor_conn *src_unconn_lx)
111 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
113 if (src_unconn_lx->data_buf.datasize == 0)
114 return 1;
115 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_SRC_UNCONN &&
116 cor_account_bufspace(src_unconn_lx) == 0)
117 return 1;
118 else
119 return 0;
122 void cor_update_windowlimit(struct cor_conn *src_in_lx)
124 __u32 bufsize;
126 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
128 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
130 if (src_in_lx->targettype != TARGET_OUT) {
131 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
132 cor_account_bufspace(src_in_lx)) {
133 bufsize = WINDOW_MAX_PER_CONN_MIN;
135 } else if (cor_seqno_before_eq(src_in_lx->target.out.seqno_windowlimit,
136 src_in_lx->target.out.seqno_nextsend)) {
137 if (cor_account_bufspace(src_in_lx)) {
138 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
140 } else {
141 __u32 windowleft = (__u32) min((__u64) U32_MAX,
142 cor_seqno_clean(
143 src_in_lx->target.out.seqno_windowlimit -
144 src_in_lx->target.out.seqno_nextsend));
146 bufsize = max(bufsize, min(windowleft,
147 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
149 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
150 cor_account_bufspace(src_in_lx)) {
151 bufsize = WINDOW_MAX_PER_CONN_MIN;
155 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
156 bufsize = WINDOW_MAX_PER_CONN_MAX;
158 /* printk(KERN_ERR "window %p %u %u\n", src_in_lx, bufsize,
159 src_in_lx->data_buf.read_remaining); */
161 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
162 bufsize = 0;
163 else
164 bufsize -= src_in_lx->data_buf.read_remaining;
166 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
167 bufsize = 0;
169 src_in_lx->source.in.window_seqnolimit =
170 src_in_lx->source.in.next_seqno + bufsize;
173 static int cor_bufsize_high_latency_sender(struct cor_conn *cn_lx)
175 struct cor_neighbor *nb;
177 __u64 latency_us;
179 if (cn_lx->sourcetype != SOURCE_IN)
180 return 0;
182 nb = cn_lx->source.in.nb;
183 if (unlikely(nb == 0))
184 return 0;
186 latency_us = atomic_read(&(nb->latency_retrans_us));
187 latency_us += atomic_read(&(nb->latency_stddev_retrans_us));
188 latency_us += CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS * 1000;
190 return latency_us > 100000 ? 1 : 0;
193 __u8 _cor_bufsize_update_get_changerate(struct cor_conn *cn_lx)
195 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
196 int high_latency_conn = (cn_lx->is_highlatency != 0);
198 __u32 changerate;
200 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
201 changerate = 128;
202 } else if (cn_lx->bufsize.state == BUFSIZE_DECR ||
203 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
204 __u8 speed = 4;
206 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
207 speed *= 2;
208 if (high_latency_sender)
209 speed *= 2;
210 if (high_latency_conn)
211 speed /= 2;
213 changerate = 128 - speed;
214 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
215 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
216 __u8 speed = 4;
218 if (high_latency_sender)
219 speed *= 2;
221 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
222 speed *= 4;
223 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
224 speed *= 2;
226 changerate = 128 + speed;
227 } else {
228 BUG();
231 /* printk(KERN_ERR "changerate1 %u\n", changerate); */
233 if (cn_lx->targettype == TARGET_OUT) {
234 __u16 remote_changerate = ((__u16)
235 cn_lx->target.out.remote_bufsize_changerate) +
237 /* printk(KERN_ERR "changerate2 %u\n", remote_changerate); */
238 changerate = (changerate * remote_changerate) / 128;
239 /* printk(KERN_ERR "changerate3 %u\n", changerate); */
242 if (unlikely(changerate < 64))
243 return 0;
244 else if (unlikely(changerate - 64 >= 256))
245 return 255;
246 else
247 return (__u8) (changerate - 64);
250 static void _cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
251 int high_latency_sender, int high_latency_conn)
253 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
254 BUG_ON(BUFSIZE_SHIFT != 5);
257 * If you change the speed here, change it in
258 * _cor_bufsize_update_get_changerate too
261 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
262 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
263 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
264 return;
267 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
268 cn_lx->bufsize.state = BUFSIZE_DECR;
269 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
272 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
273 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
274 __u8 speed = 1;
275 __u32 change;
277 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
278 speed *= 2;
280 if (high_latency_sender)
281 speed *= 2;
283 change = rcvd * speed;
284 if (high_latency_conn)
285 change /= 2;
287 if (cn_lx->bufsize.bufsize < change)
288 cn_lx->bufsize.bufsize = 0;
289 else
290 cn_lx->bufsize.bufsize -= change;
292 if (cn_lx->bufsize.act.decr.size_start/4 >
293 cn_lx->bufsize.bufsize)
294 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
295 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
296 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
297 __u8 speed = 1;
298 __u32 change;
300 if (high_latency_sender)
301 speed *= 2;
303 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
304 speed *= 4;
305 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
306 speed *= 2;
308 change = rcvd * speed;
309 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
310 cn_lx->bufsize.bufsize = U32_MAX;
311 else
312 cn_lx->bufsize.bufsize += change;
314 if (cn_lx->bufsize.bufsize >=
315 cn_lx->bufsize.act.incr.size_end) {
316 cn_lx->bufsize.bufsize =
317 cn_lx->bufsize.act.incr.size_end;
318 cn_lx->bufsize.state = BUFSIZE_NOACTION;
319 if (high_latency_conn) {
320 cn_lx->bufsize.act.noact.bytesleft =
321 (cn_lx->bufsize.bufsize >>
322 (BUFSIZE_SHIFT-3));
323 } else {
324 cn_lx->bufsize.act.noact.bytesleft =
325 (cn_lx->bufsize.bufsize >>
326 (BUFSIZE_SHIFT-2));
329 } else {
330 BUG();
333 if (unlikely(rcvd >= (1 << 24)) ||
334 cn_lx->bufsize.bytes_rcvd + rcvd >= (1 << 24))
335 cn_lx->bufsize.bytes_rcvd = (1 << 24) - 1;
336 else
337 cn_lx->bufsize.bytes_rcvd += rcvd;
340 static __u32 cor_get_read_remaining_min(__u32 bufsize_bytes,
341 int high_latency_sender, int high_latency_conn)
343 int bufspace_low = (atomic64_read(&cor_bufused_sum) >=
344 3*(BUFUSAGE_GLOBAL_MAX/4));
346 if (high_latency_conn) {
347 if (high_latency_sender) {
348 if (bufspace_low) {
349 return bufsize_bytes/6 + 1;
350 } else {
351 return bufsize_bytes/3 + 1;
353 } else {
354 if (bufspace_low) {
355 return bufsize_bytes/8 + 1;
356 } else {
357 return bufsize_bytes/4 + 1;
360 } else {
361 if (high_latency_sender) {
362 if (bufspace_low) {
363 return bufsize_bytes/6 + 1;
364 } else {
365 return bufsize_bytes/4 + 1;
367 } else {
368 if (bufspace_low) {
369 return bufsize_bytes/12 + 1;
370 } else {
371 return bufsize_bytes/8 + 1;
377 static void cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
378 __u8 windowused, __u8 rcv_flushrcvd)
380 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
381 int high_latency_conn = (cn_lx->is_highlatency != 0);
382 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
383 __u32 read_remaining_min = cor_get_read_remaining_min(bufsize_bytes,
384 high_latency_sender, high_latency_conn);
385 __u32 read_remaining_min_nofastdecr = read_remaining_min*2;
386 __u32 read_remaining_min_nodecr = (read_remaining_min +
387 read_remaining_min_nofastdecr)/2;
388 __u32 read_remaining;
390 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
391 BUG_ON(windowused > 31);
393 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
394 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
395 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
396 else
397 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
399 read_remaining = bufsize_bytes;
400 } else {
401 read_remaining = max(cn_lx->data_buf.read_remaining - rcvd,
402 (bufsize_bytes * (31 - windowused)) / 31);
405 if (rcv_flushrcvd != 0) {
406 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
407 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
408 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
409 cn_lx->bufsize.state = BUFSIZE_NOACTION;
410 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
411 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
412 cn_lx->bufsize.act.noact.bytesleft <
413 bytesleft) {
414 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
418 if (read_remaining < read_remaining_min) {
419 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
420 __u32 buf_increase;
421 __u32 bufsize_end;
423 if (high_latency_sender) {
424 if (buf_increase_bytes < rcvd/16)
425 buf_increase_bytes = rcvd/16;
426 } else {
427 if (buf_increase_bytes < rcvd/32)
428 buf_increase_bytes = rcvd/32;
431 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
432 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
433 buf_increase_bytes))
434 buf_increase = U32_MAX;
436 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
437 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
438 bufsize_end = U32_MAX;
440 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
441 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
442 cn_lx->bufsize.state = BUFSIZE_INCR;
443 cn_lx->bufsize.act.incr.size_start =
444 cn_lx->bufsize.bufsize;
445 cn_lx->bufsize.act.incr.size_end = 0;
448 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
449 cn_lx->bufsize.act.incr.size_end = bufsize_end;
450 if (bufsize_end/4 > cn_lx->bufsize.act.incr.size_start)
451 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
452 } else if (read_remaining < read_remaining_min_nodecr) {
453 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
454 cn_lx->bufsize.state == BUFSIZE_DECR ||
455 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
456 __u32 bytesleft = 0;
457 __u8 rtt_mul = 2;
458 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
459 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
460 if (high_latency_conn)
461 rtt_mul *= 2;
462 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
463 rtt_mul > bytesleft))
464 bytesleft = (cn_lx->bufsize.bufsize >>
465 BUFSIZE_SHIFT) * rtt_mul;
467 cn_lx->bufsize.state = BUFSIZE_NOACTION;
468 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
470 } else if (read_remaining < read_remaining_min_nofastdecr) {
471 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
472 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
473 cn_lx->bufsize.state = BUFSIZE_DECR;
474 cn_lx->bufsize.act.decr.size_start =
475 cn_lx->bufsize.bufsize;
479 if (cn_lx->targettype == TARGET_OUT) {
480 __u16 rem_changerate = ((__u16)
481 cn_lx->target.out.remote_bufsize_changerate) +
483 __u16 crate_nofastdecr;
484 __u16 crate_nodecr;
485 __u16 crate_nofastincr;
486 __u16 crate_noincr;
488 if (high_latency_conn) {
489 crate_nofastdecr = 128 - 128/8;
490 crate_nodecr = 128 - 128/6;
491 crate_nofastincr = 128 + 128/4;
492 crate_noincr = 128 + 128/3;
493 } else {
494 crate_nofastdecr = 128 - 128/16;
495 crate_nodecr = 128 - 128/12;
496 crate_nofastincr = 128 + 128/8;
497 crate_noincr = 128 + 128/6;
500 if ((rem_changerate < crate_nodecr ||
501 rem_changerate > crate_noincr) &&
502 cn_lx->bufsize.state == BUFSIZE_NOACTION) {
503 cn_lx->bufsize.act.noact.bytesleft = max(
504 cn_lx->bufsize.act.noact.bytesleft,
505 cn_lx->bufsize.bufsize >>
506 BUFSIZE_SHIFT);
509 if (rem_changerate < crate_nodecr && (
510 cn_lx->bufsize.state == BUFSIZE_DECR ||
511 cn_lx->bufsize.state == BUFSIZE_DECR_FAST)) {
512 cn_lx->bufsize.state = BUFSIZE_NOACTION;
513 cn_lx->bufsize.act.noact.bytesleft =
514 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
516 if (rem_changerate < crate_nofastdecr &&
517 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
518 cn_lx->bufsize.state = BUFSIZE_DECR;
519 cn_lx->bufsize.act.decr.size_start =
520 cn_lx->bufsize.bufsize;
523 if (rem_changerate > crate_noincr && (
524 cn_lx->bufsize.state == BUFSIZE_INCR ||
525 cn_lx->bufsize.state == BUFSIZE_INCR_FAST)) {
526 cn_lx->bufsize.state = BUFSIZE_NOACTION;
527 cn_lx->bufsize.act.noact.bytesleft =
528 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
530 if (rem_changerate > crate_nofastincr &&
531 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
532 cn_lx->bufsize.state = BUFSIZE_INCR;
533 cn_lx->bufsize.act.incr.size_start =
534 cn_lx->bufsize.bufsize;
538 _cor_bufsize_update(cn_lx, rcvd, high_latency_sender,
539 high_latency_conn);
542 void cor_bufsize_read_to_sock(struct cor_conn *trgt_sock_lx)
544 unsigned long jiffies_tmp = jiffies;
545 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
546 HZ/10 : HZ/40);
548 if (trgt_sock_lx->target.sock.waiting_for_userspace != 0 && time_before(
549 trgt_sock_lx->target.sock.waiting_for_userspace_since,
550 jiffies - latency_limit)) {
551 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
552 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
555 if (trgt_sock_lx->data_buf.read_remaining == 0) {
556 trgt_sock_lx->target.sock.waiting_for_userspace = 0;
557 } else {
558 trgt_sock_lx->target.sock.waiting_for_userspace = 1;
559 trgt_sock_lx->target.sock.waiting_for_userspace_since =
560 jiffies_tmp;
564 static inline void cor_databuf_item_unlink(struct cor_conn *cn_lx,
565 struct cor_data_buf_item *item)
567 BUG_ON(item == cn_lx->data_buf.nextread);
568 list_del(&(item->buf_list));
569 if (item->type == DATABUF_BUF) {
570 cn_lx->data_buf.overhead -= sizeof(struct cor_data_buf_item) +
571 item->buflen - item->datalen;
572 } else if (item->type == DATABUF_SKB) {
573 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
574 } else {
575 BUG();
579 void cor_databuf_ackdiscard(struct cor_conn *cn_lx)
581 __u32 freed = 0;
583 cn_lx->data_buf.next_read_offset = 0;
584 cn_lx->data_buf.nextread = 0;
586 while (!list_empty(&(cn_lx->data_buf.items))) {
587 struct cor_data_buf_item *item = container_of(
588 cn_lx->data_buf.items.next,
589 struct cor_data_buf_item, buf_list);
590 freed += item->datalen;
592 cor_databuf_item_unlink(cn_lx, item);
593 cor_databuf_item_free(item);
596 cn_lx->data_buf.datasize -= freed;
597 cn_lx->data_buf.first_offset += freed;
599 BUG_ON(cn_lx->data_buf.datasize != 0);
600 BUG_ON(cn_lx->data_buf.overhead != 0);
602 cn_lx->data_buf.read_remaining = 0;
605 void cor_reset_seqno(struct cor_conn *cn_l, __u64 initseqno)
607 cn_l->data_buf.first_offset = initseqno -
608 cn_l->data_buf.datasize +
609 cn_l->data_buf.read_remaining;
612 static void cor_databuf_nextreadchunk(struct cor_conn *cn_lx)
614 BUG_ON(cn_lx->data_buf.nextread == 0);
615 BUG_ON(cn_lx->data_buf.next_read_offset !=
616 cn_lx->data_buf.nextread->datalen);
618 if (&(cn_lx->data_buf.nextread->buf_list) ==
619 cn_lx->data_buf.items.prev) {
620 BUG_ON(cn_lx->data_buf.read_remaining != 0);
621 cn_lx->data_buf.nextread = 0;
623 } else {
624 BUG_ON(cn_lx->data_buf.read_remaining == 0);
625 cn_lx->data_buf.nextread = container_of(
626 cn_lx->data_buf.nextread->buf_list.next,
627 struct cor_data_buf_item, buf_list);
630 cn_lx->data_buf.next_read_offset = 0;
633 void cor_databuf_pull(struct cor_conn *cn_lx, char *dst, __u32 len)
635 BUG_ON(cn_lx->data_buf.read_remaining < len);
637 while (len > 0) {
638 int cpy = len;
640 char *srcbufcpystart = 0;
641 int srcbufcpylen = 0;
643 BUG_ON(cn_lx->data_buf.nextread == 0);
644 BUG_ON(cn_lx->data_buf.next_read_offset >=
645 cn_lx->data_buf.nextread->datalen);
647 srcbufcpystart = cn_lx->data_buf.nextread->buf +
648 cn_lx->data_buf.next_read_offset;
649 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
650 cn_lx->data_buf.next_read_offset;
652 if (cpy > srcbufcpylen)
653 cpy = srcbufcpylen;
655 memcpy(dst, srcbufcpystart, cpy);
657 dst += cpy;
658 len -= cpy;
660 cn_lx->data_buf.read_remaining -= cpy;
661 cn_lx->data_buf.next_read_offset += cpy;
663 if (cpy == srcbufcpylen)
664 cor_databuf_nextreadchunk(cn_lx);
668 void cor_databuf_unpull_dpi(struct cor_conn *trgt_sock, struct cor_sock *cs,
669 struct cor_data_buf_item *item, __u16 next_read_offset)
671 BUG_ON(next_read_offset > item->datalen);
673 if (next_read_offset >= item->datalen)
674 goto free;
676 spin_lock_bh(&(trgt_sock->rcv_lock));
678 if (unlikely(cor_is_trgt_sock(trgt_sock, cs) == 0)) {
679 spin_unlock_bh(&(trgt_sock->rcv_lock));
680 goto free;
683 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
684 &(trgt_sock->data_buf.nextread->buf_list) !=
685 trgt_sock->data_buf.items.next);
686 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
688 trgt_sock->data_buf.first_offset -= item->datalen;
689 trgt_sock->data_buf.datasize += item->datalen;
690 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
692 if (item->type == DATABUF_BUF) {
693 trgt_sock->data_buf.overhead +=
694 sizeof(struct cor_data_buf_item) +
695 item->buflen - item->datalen;
696 } else if (item->type == DATABUF_SKB) {
697 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
698 } else {
699 BUG();
702 list_add(&(item->buf_list), &(trgt_sock->data_buf.items));
704 trgt_sock->data_buf.nextread = item;
705 trgt_sock->data_buf.next_read_offset = next_read_offset;
707 cor_account_bufspace(trgt_sock);
709 spin_unlock_bh(&(trgt_sock->rcv_lock));
711 if (0) {
712 free:
713 cor_databuf_item_free(item);
717 void cor_databuf_pull_dbi(struct cor_sock *cs_rl, struct cor_conn *trgt_sock_l)
719 struct cor_data_buf_item *dbi = 0;
720 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
721 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
723 if (trgt_sock_l->data_buf.read_remaining == 0)
724 return;
726 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
727 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
728 trgt_sock_l->data_buf.nextread->datalen);
729 dbi = trgt_sock_l->data_buf.nextread;
731 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
733 cs_rl->data.conn_raw.rcvitem = dbi;
734 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
736 trgt_sock_l->data_buf.first_offset += dbi->datalen;
737 trgt_sock_l->data_buf.datasize -= dbi->datalen;
738 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
740 cor_account_bufspace(trgt_sock_l);
742 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
743 cor_databuf_nextreadchunk(trgt_sock_l);
745 cor_databuf_item_unlink(trgt_sock_l, dbi);
748 void cor_databuf_unpull(struct cor_conn *trgt_out_l, __u32 bytes)
750 trgt_out_l->data_buf.read_remaining += bytes;
752 BUG_ON(list_empty(&(trgt_out_l->data_buf.items)) != 0);
754 if (trgt_out_l->data_buf.nextread == 0) {
755 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
757 trgt_out_l->data_buf.nextread = container_of(
758 trgt_out_l->data_buf.items.prev,
759 struct cor_data_buf_item, buf_list);
762 while (bytes > trgt_out_l->data_buf.next_read_offset) {
763 bytes -= trgt_out_l->data_buf.next_read_offset;
764 trgt_out_l->data_buf.nextread = container_of(
765 trgt_out_l->data_buf.nextread->buf_list.prev,
766 struct cor_data_buf_item, buf_list);
767 BUG_ON(&(trgt_out_l->data_buf.nextread->buf_list) ==
768 &(trgt_out_l->data_buf.items));
769 trgt_out_l->data_buf.next_read_offset =
770 trgt_out_l->data_buf.nextread->datalen;
773 trgt_out_l->data_buf.next_read_offset -= bytes;
776 void cor_databuf_pullold(struct cor_conn *trgt_out_l, __u64 startpos, char *dst,
777 int len)
779 __u64 pos = trgt_out_l->data_buf.first_offset;
780 struct cor_data_buf_item *dbi = container_of(
781 trgt_out_l->data_buf.items.next,
782 struct cor_data_buf_item, buf_list);
784 while (1) {
785 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
787 if (cor_seqno_after(pos + dbi->datalen, startpos))
788 break;
790 pos += dbi->datalen;
791 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
792 buf_list);
795 while (len > 0) {
796 int cpy = len;
798 char *srcbufcpystart = 0;
799 int srcbufcpylen = 0;
801 __u64 offset = cor_seqno_clean(startpos - pos);
803 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
805 BUG_ON(cor_seqno_before(startpos, pos));
806 BUG_ON(offset > dbi->datalen);
808 srcbufcpystart = dbi->buf + offset;
809 srcbufcpylen = dbi->datalen - offset;
811 if (cpy > srcbufcpylen)
812 cpy = srcbufcpylen;
814 memcpy(dst, srcbufcpystart, cpy);
816 dst += cpy;
817 len -= cpy;
818 startpos += cpy;
820 pos += dbi->datalen;
821 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
822 buf_list);
826 /* ack up to *not* including pos */
827 void cor_databuf_ack(struct cor_conn *trgt_out_l, __u64 pos)
829 __u32 acked = 0;
831 while (!list_empty(&(trgt_out_l->data_buf.items))) {
832 struct cor_data_buf_item *firstitem = container_of(
833 trgt_out_l->data_buf.items.next,
834 struct cor_data_buf_item, buf_list);
836 if (firstitem == trgt_out_l->data_buf.nextread)
837 break;
839 if (cor_seqno_after_eq(trgt_out_l->data_buf.first_offset +
840 firstitem->datalen, pos))
841 break;
843 trgt_out_l->data_buf.first_offset += firstitem->datalen;
844 acked += firstitem->datalen;
846 cor_databuf_item_unlink(trgt_out_l, firstitem);
847 cor_databuf_item_free(firstitem);
850 trgt_out_l->data_buf.datasize -= acked;
852 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
853 trgt_out_l->data_buf.overhead != 0);
855 if (unlikely(trgt_out_l->target.out.nblist_busy_remaining <= acked)) {
856 trgt_out_l->target.out.nblist_busy_remaining = 0;
857 cor_conn_set_last_act(trgt_out_l);
858 } else {
859 trgt_out_l->target.out.nblist_busy_remaining -= acked;
862 if (acked != 0)
863 cor_account_bufspace(trgt_out_l);
866 void cor_databuf_ackread(struct cor_conn *cn_lx)
868 __u32 acked = 0;
870 while (!list_empty(&(cn_lx->data_buf.items))) {
871 struct cor_data_buf_item *firstitem = container_of(
872 cn_lx->data_buf.items.next,
873 struct cor_data_buf_item, buf_list);
875 if (firstitem == cn_lx->data_buf.nextread)
876 break;
878 acked += firstitem->datalen;
880 cor_databuf_item_unlink(cn_lx, firstitem);
881 cor_databuf_item_free(firstitem);
884 cn_lx->data_buf.datasize -= acked;
885 cn_lx->data_buf.first_offset += acked;
887 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
889 if (cn_lx->targettype == TARGET_OUT) {
890 if (unlikely(cn_lx->target.out.nblist_busy_remaining <=
891 acked)) {
892 cn_lx->target.out.nblist_busy_remaining = 0;
893 cor_conn_set_last_act(cn_lx);
894 } else {
895 cn_lx->target.out.nblist_busy_remaining -= acked;
899 if (acked != 0)
900 cor_account_bufspace(cn_lx);
903 __u32 _cor_receive_buf(struct cor_conn *cn_lx, char *buf, __u32 datalen,
904 int from_sock, __u8 windowused, __u8 flush)
906 struct cor_data_buf_item *item = 0;
908 __u32 totalcpy = 0;
910 if (list_empty(&(cn_lx->data_buf.items)) == 0) {
911 struct list_head *last = cn_lx->data_buf.items.prev;
912 item = container_of(last, struct cor_data_buf_item, buf_list);
915 while (datalen > 0) {
916 __u32 cpy = datalen;
918 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
919 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
921 if (item == 0 || item->type != DATABUF_BUF ||
922 item->buflen <= item->datalen) {
923 item = kmem_cache_alloc(cor_data_buf_item_slab,
924 GFP_ATOMIC);
925 if (unlikely(item == 0))
926 break;
928 memset(item, 0, sizeof(struct cor_data_buf_item));
929 item->type = DATABUF_BUF;
931 item->buflen = cor_buf_optlen(datalen, from_sock);
932 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
934 if (unlikely(item->buf == 0)) {
935 kmem_cache_free(cor_data_buf_item_slab, item);
936 break;
938 item->datalen = 0;
940 list_add_tail(&(item->buf_list),
941 &(cn_lx->data_buf.items));
943 cn_lx->data_buf.overhead += item->buflen +
944 sizeof(struct cor_data_buf_item);
947 BUG_ON(item->type != DATABUF_BUF);
948 BUG_ON(item->buflen <= item->datalen);
950 if (cn_lx->data_buf.nextread == 0) {
951 cn_lx->data_buf.nextread = item;
952 cn_lx->data_buf.next_read_offset = item->datalen;
955 if (item->buflen - item->datalen < cpy)
956 cpy = (item->buflen - item->datalen);
958 memcpy(item->buf + item->datalen, buf, cpy);
959 item->datalen += cpy;
961 BUG_ON(cpy > datalen);
962 buf += cpy;
963 datalen -= cpy;
964 totalcpy += cpy;
966 cn_lx->data_buf.read_remaining += cpy;
967 cn_lx->data_buf.datasize += cpy;
968 cn_lx->data_buf.overhead -= cpy;
969 BUG_ON(cn_lx->data_buf.datasize != 0 &&
970 cn_lx->data_buf.overhead == 0);
973 if (datalen != 0)
974 flush = 0;
975 cn_lx->flush = flush;
977 cor_account_bufspace(cn_lx);
978 cor_bufsize_update(cn_lx, totalcpy, windowused, flush);
980 return totalcpy;
983 __u32 cor_receive_skb(struct cor_conn *src_in_l, struct sk_buff *skb,
984 __u8 windowused, __u8 flush)
986 struct cor_skb_procstate *ps = cor_skb_pstate(skb);
987 struct cor_data_buf_item *item = &(ps->funcstate.rcv.dbi);
989 __u32 bufferleft = 0;
991 BUG_ON(skb->len <= 0);
993 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
994 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
995 (1 << 30))))
996 return 0;
998 if (list_empty(&(src_in_l->data_buf.items)) == 0) {
999 struct list_head *last = src_in_l->data_buf.items.prev;
1000 struct cor_data_buf_item *item = container_of(last,
1001 struct cor_data_buf_item, buf_list);
1002 bufferleft = item->buflen - item->datalen;
1005 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
1006 __u32 rc = cor_receive_buf(src_in_l, skb->data, skb->len,
1007 windowused, flush);
1008 if (likely(rc == skb->len))
1009 kfree_skb(skb);
1010 return rc;
1013 memset(item, 0, sizeof(struct cor_data_buf_item));
1015 item->type = DATABUF_SKB;
1016 item->buf = skb->data;
1017 item->datalen = skb->len;
1018 item->buflen = item->datalen;
1019 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
1020 if (src_in_l->data_buf.nextread == 0)
1021 src_in_l->data_buf.nextread = item;
1023 src_in_l->data_buf.read_remaining += item->datalen;
1024 src_in_l->data_buf.datasize += item->datalen;
1025 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
1027 cor_account_bufspace(src_in_l);
1028 cor_bufsize_update(src_in_l, skb->len, windowused, flush);
1030 src_in_l->flush = flush;
1032 return skb->len;
1035 void cor_wake_sender(struct cor_conn *cn)
1037 spin_lock_bh(&(cn->rcv_lock));
1039 if (unlikely(cn->isreset)) {
1040 spin_unlock_bh(&(cn->rcv_lock));
1041 return;
1044 switch (cn->sourcetype) {
1045 case SOURCE_UNCONNECTED:
1046 spin_unlock_bh(&(cn->rcv_lock));
1047 spin_lock_bh(&(cor_get_conn_reversedir(cn)->rcv_lock));
1048 if (likely(cor_get_conn_reversedir(cn)->isreset == 0 &&
1049 cor_get_conn_reversedir(cn)->targettype ==
1050 TARGET_UNCONNECTED))
1051 cor_proc_cpacket(cor_get_conn_reversedir(cn));
1052 spin_unlock_bh(&(cor_get_conn_reversedir(cn)->rcv_lock));
1053 break;
1054 case SOURCE_SOCK:
1055 if (_cor_mngdsocket_flushtoconn(cn) == RC_FTC_OK &&
1056 cn->source.sock.ed->cs != 0 &&
1057 cor_sock_sndbufavailable(cn, 1))
1058 cor_sk_write_space(cn->source.sock.ed->cs);
1059 spin_unlock_bh(&(cn->rcv_lock));
1060 break;
1061 case SOURCE_IN:
1062 cor_drain_ooo_queue(cn);
1063 if (likely(cn->source.in.established != 0)) {
1064 cor_send_ack_conn_ifneeded(cn, 0, 0);
1066 spin_unlock_bh(&(cn->rcv_lock));
1067 break;
1068 default:
1069 BUG();
1073 int __init cor_forward_init(void)
1075 cor_data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
1076 sizeof(struct cor_data_buf_item), 8, 0, 0);
1077 if (unlikely(cor_data_buf_item_slab == 0))
1078 return -ENOMEM;
1080 atomic64_set(&cor_bufused_sum, 0);
1082 return 0;
1085 void __exit cor_forward_exit2(void)
1087 BUG_ON(atomic64_read(&cor_bufused_sum) != 0);
1089 kmem_cache_destroy(cor_data_buf_item_slab);
1090 cor_data_buf_item_slab = 0;
1093 MODULE_LICENSE("GPL");