send ack needed
[cor.git] / net / cor / conn_databuf.c
blob708ac5ea8ec8113c405ffa94bc16c41b4b35d7eb
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *cor_data_buf_item_slab;
27 atomic64_t cor_bufused_sum;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&cor_bufused_sum);
32 } */
34 void cor_databuf_init(struct cor_conn *cn_init)
36 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
37 INIT_LIST_HEAD(&(cn_init->data_buf.items));
40 void cor_bufsize_init(struct cor_conn *cn_l, __u32 bufsize)
42 __u32 bufsize_shifted;
44 memset(&(cn_l->bufsize), 0, sizeof(cn_l->bufsize));
46 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
47 bufsize_shifted = U32_MAX;
48 else
49 bufsize_shifted = bufsize << 5;
51 cn_l->bufsize.bufsize = bufsize_shifted;
52 cn_l->bufsize.state = BUFSIZE_NOACTION;
53 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
56 #warning todo reset stalled conns on low memory
57 /**
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int cor_account_bufspace(struct cor_conn *cn_lx)
63 __u64 space_needed = 0;
64 __u32 space_req;
65 __u64 bufused_sum_int;
67 if (likely(cn_lx->isreset == 0)) {
68 space_needed += cn_lx->data_buf.datasize;
69 space_needed += cn_lx->data_buf.overhead;
70 if (cn_lx->sourcetype == SOURCE_IN) {
71 space_needed += cn_lx->source.in.reorder_memused;
75 if (cn_lx->bufspace_accounted == space_needed) {
76 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
77 return 1;
78 } else if (atomic64_read(&cor_bufused_sum) >=
79 BUFUSAGE_GLOBAL_MAX) {
80 return 1;
81 } else {
82 return 0;
86 if (unlikely(space_needed >= U32_MAX))
87 space_req = U32_MAX;
88 else
89 space_req = space_needed;
91 if (cn_lx->bufspace_accounted == space_req)
92 return 1;
94 bufused_sum_int = cor_update_atomic_sum(&cor_bufused_sum,
95 cn_lx->bufspace_accounted, space_req);
97 cn_lx->bufspace_accounted = space_req;
99 if (space_needed != space_req)
100 return 1;
101 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
102 return 1;
103 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
104 return 1;
105 else
106 return 0;
109 int cor_cpacket_write_allowed(struct cor_conn *src_unconn_lx)
111 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
113 if (src_unconn_lx->data_buf.datasize == 0)
114 return 1;
115 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_CPACKETS &&
116 cor_account_bufspace(src_unconn_lx) == 0)
117 return 1;
118 else
119 return 0;
122 void cor_update_windowlimit(struct cor_conn *src_in_lx)
124 __u32 bufsize;
126 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
128 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
130 if (src_in_lx->targettype != TARGET_OUT) {
131 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
132 cor_account_bufspace(src_in_lx)) {
133 bufsize = WINDOW_MAX_PER_CONN_MIN;
135 } else if (cor_seqno_before_eq(src_in_lx->target.out.seqno_windowlimit,
136 src_in_lx->target.out.seqno_nextsend)) {
137 if (cor_account_bufspace(src_in_lx)) {
138 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
140 } else {
141 __u32 windowleft = (__u32) min((__u64) U32_MAX,
142 cor_seqno_clean(
143 src_in_lx->target.out.seqno_windowlimit -
144 src_in_lx->target.out.seqno_nextsend));
146 bufsize = max(bufsize, min(windowleft,
147 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
149 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
150 cor_account_bufspace(src_in_lx)) {
151 bufsize = WINDOW_MAX_PER_CONN_MIN;
155 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
156 bufsize = WINDOW_MAX_PER_CONN_MAX;
158 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
160 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
161 bufsize = 0;
162 else
163 bufsize -= src_in_lx->data_buf.read_remaining;
165 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
166 bufsize = 0;
168 src_in_lx->source.in.window_seqnolimit =
169 src_in_lx->source.in.next_seqno + bufsize;
172 static int cor_bufsize_high_latency_sender(struct cor_conn *cn_lx)
174 struct cor_neighbor *nb;
176 __u64 latency_us;
178 if (cn_lx->sourcetype != SOURCE_IN)
179 return 0;
181 nb = cn_lx->source.in.nb;
182 if (unlikely(nb == 0))
183 return 0;
185 latency_us = atomic_read(&(nb->latency_retrans_us));
186 latency_us += atomic_read(&(nb->latency_stddev_retrans_us));
187 latency_us += CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS * 1000;
189 return latency_us > 100000 ? 1 : 0;
192 __u8 _cor_bufsize_update_get_changerate(struct cor_conn *cn_lx)
194 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
195 int high_latency_conn = (cn_lx->is_highlatency != 0);
197 __u32 changerate;
199 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
200 changerate = 128;
201 } else if (cn_lx->bufsize.state == BUFSIZE_DECR ||
202 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
203 __u8 speed = 4;
205 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
206 speed *= 2;
207 if (high_latency_sender)
208 speed *= 2;
209 if (high_latency_conn)
210 speed /= 2;
212 changerate = 128 - speed;
213 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
214 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
215 __u8 speed = 4;
217 if (high_latency_sender)
218 speed *= 2;
220 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
221 speed *= 4;
222 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
223 speed *= 2;
225 changerate = 128 + speed;
226 } else {
227 BUG();
230 /* printk(KERN_ERR "changerate1 %u", changerate); */
232 if (cn_lx->targettype == TARGET_OUT) {
233 __u16 remote_changerate = ((__u16)
234 cn_lx->target.out.remote_bufsize_changerate) +
236 /* printk(KERN_ERR "changerate2 %u", remote_changerate); */
237 changerate = (changerate * remote_changerate) / 128;
238 /* printk(KERN_ERR "changerate3 %u", changerate); */
241 if (unlikely(changerate < 64))
242 return 0;
243 else if (unlikely(changerate - 64 >= 256))
244 return 255;
245 else
246 return (__u8) (changerate - 64);
249 static void _cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
250 int high_latency_sender, int high_latency_conn)
252 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
253 BUG_ON(BUFSIZE_SHIFT != 5);
256 * If you change the speed here, change it in
257 * _cor_bufsize_update_get_changerate too
260 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
261 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
262 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
263 return;
266 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
267 cn_lx->bufsize.state = BUFSIZE_DECR;
268 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
271 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
272 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
273 __u8 speed = 1;
274 __u32 change;
276 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
277 speed *= 2;
279 if (high_latency_sender)
280 speed *= 2;
282 change = rcvd * speed;
283 if (high_latency_conn)
284 change /= 2;
286 if (cn_lx->bufsize.bufsize < change)
287 cn_lx->bufsize.bufsize = 0;
288 else
289 cn_lx->bufsize.bufsize -= change;
291 if (cn_lx->bufsize.act.decr.size_start/4 >
292 cn_lx->bufsize.bufsize)
293 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
294 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
295 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
296 __u8 speed = 1;
297 __u32 change;
299 if (high_latency_sender)
300 speed *= 2;
302 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
303 speed *= 4;
304 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
305 speed *= 2;
307 change = rcvd * speed;
308 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
309 cn_lx->bufsize.bufsize = U32_MAX;
310 else
311 cn_lx->bufsize.bufsize += change;
313 if (cn_lx->bufsize.bufsize >=
314 cn_lx->bufsize.act.incr.size_end) {
315 cn_lx->bufsize.bufsize =
316 cn_lx->bufsize.act.incr.size_end;
317 cn_lx->bufsize.state = BUFSIZE_NOACTION;
318 if (high_latency_conn) {
319 cn_lx->bufsize.act.noact.bytesleft =
320 (cn_lx->bufsize.bufsize >>
321 (BUFSIZE_SHIFT-3));
322 } else {
323 cn_lx->bufsize.act.noact.bytesleft =
324 (cn_lx->bufsize.bufsize >>
325 (BUFSIZE_SHIFT-2));
328 } else {
329 BUG();
332 if (unlikely(rcvd >= (1 << 24)) ||
333 cn_lx->bufsize.bytes_rcvd + rcvd >= (1 << 24))
334 cn_lx->bufsize.bytes_rcvd = (1 << 24) - 1;
335 else
336 cn_lx->bufsize.bytes_rcvd += rcvd;
339 static __u32 cor_get_read_remaining_min(__u32 bufsize_bytes,
340 int high_latency_sender, int high_latency_conn)
342 int bufspace_low = (atomic64_read(&cor_bufused_sum) >=
343 3*(BUFUSAGE_GLOBAL_MAX/4));
345 if (high_latency_conn) {
346 if (high_latency_sender) {
347 if (bufspace_low) {
348 return bufsize_bytes/6 + 1;
349 } else {
350 return bufsize_bytes/3 + 1;
352 } else {
353 if (bufspace_low) {
354 return bufsize_bytes/8 + 1;
355 } else {
356 return bufsize_bytes/4 + 1;
359 } else {
360 if (high_latency_sender) {
361 if (bufspace_low) {
362 return bufsize_bytes/6 + 1;
363 } else {
364 return bufsize_bytes/4 + 1;
366 } else {
367 if (bufspace_low) {
368 return bufsize_bytes/12 + 1;
369 } else {
370 return bufsize_bytes/8 + 1;
376 static void cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
377 int rcv_delayed_lowbuf, __u8 rcv_flushrcvd)
379 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
380 int high_latency_conn = (cn_lx->is_highlatency != 0);
381 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
382 __u32 read_remaining_min = cor_get_read_remaining_min(bufsize_bytes,
383 high_latency_sender, high_latency_conn);
384 __u32 read_remaining_min_nofastdecr = read_remaining_min*2;
385 __u32 read_remaining_min_nodecr = (read_remaining_min +
386 read_remaining_min_nofastdecr)/2;
387 __u32 read_remaining = cn_lx->data_buf.read_remaining - rcvd;
389 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
391 if (rcv_flushrcvd != 0) {
392 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
393 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
394 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
395 cn_lx->bufsize.state = BUFSIZE_NOACTION;
396 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
397 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
398 cn_lx->bufsize.act.noact.bytesleft <
399 bytesleft) {
400 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
404 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
405 rcv_delayed_lowbuf = 0;
406 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
407 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
408 else
409 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
412 if (rcv_delayed_lowbuf && read_remaining < read_remaining_min) {
413 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
414 __u32 buf_increase;
415 __u32 bufsize_end;
417 if (high_latency_sender) {
418 if (buf_increase_bytes < rcvd/16)
419 buf_increase_bytes = rcvd/16;
420 } else {
421 if (buf_increase_bytes < rcvd/32)
422 buf_increase_bytes = rcvd/32;
425 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
426 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
427 buf_increase_bytes))
428 buf_increase = U32_MAX;
430 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
431 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
432 bufsize_end = U32_MAX;
434 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
435 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
436 cn_lx->bufsize.state = BUFSIZE_INCR;
437 cn_lx->bufsize.act.incr.size_start =
438 cn_lx->bufsize.bufsize;
439 cn_lx->bufsize.act.incr.size_end = 0;
442 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
443 cn_lx->bufsize.act.incr.size_end = bufsize_end;
444 if (bufsize_end/4 > cn_lx->bufsize.act.incr.size_start)
445 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
446 } else if (rcv_delayed_lowbuf &&
447 read_remaining < read_remaining_min_nodecr) {
448 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
449 cn_lx->bufsize.state == BUFSIZE_DECR ||
450 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
451 __u32 bytesleft = 0;
452 __u8 rtt_mul = 2;
453 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
454 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
455 if (high_latency_conn)
456 rtt_mul *= 2;
457 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
458 rtt_mul > bytesleft))
459 bytesleft = (cn_lx->bufsize.bufsize >>
460 BUFSIZE_SHIFT) * rtt_mul;
462 cn_lx->bufsize.state = BUFSIZE_NOACTION;
463 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
465 } else if (rcv_delayed_lowbuf &&
466 read_remaining < read_remaining_min_nofastdecr) {
467 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
468 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
469 cn_lx->bufsize.state = BUFSIZE_DECR;
470 cn_lx->bufsize.act.decr.size_start =
471 cn_lx->bufsize.bufsize;
475 if (cn_lx->targettype == TARGET_OUT) {
476 __u16 rem_changerate = ((__u16)
477 cn_lx->target.out.remote_bufsize_changerate) +
479 __u16 crate_nofastdecr;
480 __u16 crate_nodecr;
481 __u16 crate_nofastincr;
482 __u16 crate_noincr;
484 if (high_latency_conn) {
485 crate_nofastdecr = 128 - 128/8;
486 crate_nodecr = 128 - 128/6;
487 crate_nofastincr = 128 + 128/4;
488 crate_noincr = 128 + 128/3;
489 } else {
490 crate_nofastdecr = 128 - 128/16;
491 crate_nodecr = 128 - 128/12;
492 crate_nofastincr = 128 + 128/8;
493 crate_noincr = 128 + 128/6;
496 if ((rem_changerate < crate_nodecr ||
497 rem_changerate > crate_noincr) &&
498 cn_lx->bufsize.state == BUFSIZE_NOACTION) {
499 cn_lx->bufsize.act.noact.bytesleft = max(
500 cn_lx->bufsize.act.noact.bytesleft,
501 cn_lx->bufsize.bufsize >>
502 BUFSIZE_SHIFT);
505 if (rem_changerate < crate_nodecr && (
506 cn_lx->bufsize.state == BUFSIZE_DECR ||
507 cn_lx->bufsize.state == BUFSIZE_DECR_FAST)) {
508 cn_lx->bufsize.state = BUFSIZE_NOACTION;
509 cn_lx->bufsize.act.noact.bytesleft =
510 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
512 if (rem_changerate < crate_nofastdecr &&
513 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
514 cn_lx->bufsize.state = BUFSIZE_DECR;
515 cn_lx->bufsize.act.decr.size_start =
516 cn_lx->bufsize.bufsize;
519 if (rem_changerate > crate_noincr && (
520 cn_lx->bufsize.state == BUFSIZE_INCR ||
521 cn_lx->bufsize.state == BUFSIZE_INCR_FAST)) {
522 cn_lx->bufsize.state = BUFSIZE_NOACTION;
523 cn_lx->bufsize.act.noact.bytesleft =
524 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
526 if (rem_changerate > crate_nofastincr &&
527 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
528 cn_lx->bufsize.state = BUFSIZE_INCR;
529 cn_lx->bufsize.act.incr.size_start =
530 cn_lx->bufsize.bufsize;
534 _cor_bufsize_update(cn_lx, rcvd, high_latency_sender,
535 high_latency_conn);
538 void cor_bufsize_read_to_sock(struct cor_conn *trgt_sock_lx)
540 unsigned long jiffies_tmp = jiffies;
541 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
542 HZ/10 : HZ/40);
544 if (trgt_sock_lx->target.sock.waiting_for_userspace != 0 && time_before(
545 trgt_sock_lx->target.sock.waiting_for_userspace_since,
546 jiffies - latency_limit)) {
547 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
548 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
551 if (trgt_sock_lx->data_buf.read_remaining == 0) {
552 trgt_sock_lx->target.sock.waiting_for_userspace = 0;
553 } else {
554 trgt_sock_lx->target.sock.waiting_for_userspace = 1;
555 trgt_sock_lx->target.sock.waiting_for_userspace_since =
556 jiffies_tmp;
560 static inline void cor_databuf_item_unlink(struct cor_conn *cn_lx,
561 struct cor_data_buf_item *item)
563 BUG_ON(item == cn_lx->data_buf.nextread);
564 list_del(&(item->buf_list));
565 if (item->type == DATABUF_BUF) {
566 cn_lx->data_buf.overhead -= sizeof(struct cor_data_buf_item) +
567 item->buflen - item->datalen;
568 } else if (item->type == DATABUF_SKB) {
569 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
570 } else {
571 BUG();
575 void cor_databuf_ackdiscard(struct cor_conn *cn_lx)
577 __u32 freed = 0;
579 cn_lx->data_buf.next_read_offset = 0;
580 cn_lx->data_buf.nextread = 0;
582 while (!list_empty(&(cn_lx->data_buf.items))) {
583 struct cor_data_buf_item *item = container_of(
584 cn_lx->data_buf.items.next,
585 struct cor_data_buf_item, buf_list);
586 freed += item->datalen;
588 cor_databuf_item_unlink(cn_lx, item);
589 cor_databuf_item_free(item);
592 cn_lx->data_buf.datasize -= freed;
593 cn_lx->data_buf.first_offset += freed;
595 BUG_ON(cn_lx->data_buf.datasize != 0);
596 BUG_ON(cn_lx->data_buf.overhead != 0);
598 cn_lx->data_buf.read_remaining = 0;
601 void cor_reset_seqno(struct cor_conn *cn_l, __u64 initseqno)
603 cn_l->data_buf.first_offset = initseqno -
604 cn_l->data_buf.datasize +
605 cn_l->data_buf.read_remaining;
608 static void cor_databuf_nextreadchunk(struct cor_conn *cn_lx)
610 BUG_ON(cn_lx->data_buf.nextread == 0);
611 BUG_ON(cn_lx->data_buf.next_read_offset !=
612 cn_lx->data_buf.nextread->datalen);
614 if (&(cn_lx->data_buf.nextread->buf_list) ==
615 cn_lx->data_buf.items.prev) {
616 BUG_ON(cn_lx->data_buf.read_remaining != 0);
617 cn_lx->data_buf.nextread = 0;
619 } else {
620 BUG_ON(cn_lx->data_buf.read_remaining == 0);
621 cn_lx->data_buf.nextread = container_of(
622 cn_lx->data_buf.nextread->buf_list.next,
623 struct cor_data_buf_item, buf_list);
626 cn_lx->data_buf.next_read_offset = 0;
629 void cor_databuf_pull(struct cor_conn *cn_lx, char *dst, __u32 len)
631 BUG_ON(cn_lx->data_buf.read_remaining < len);
633 while (len > 0) {
634 int cpy = len;
636 char *srcbufcpystart = 0;
637 int srcbufcpylen = 0;
639 BUG_ON(cn_lx->data_buf.nextread == 0);
640 BUG_ON(cn_lx->data_buf.next_read_offset >=
641 cn_lx->data_buf.nextread->datalen);
643 srcbufcpystart = cn_lx->data_buf.nextread->buf +
644 cn_lx->data_buf.next_read_offset;
645 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
646 cn_lx->data_buf.next_read_offset;
648 if (cpy > srcbufcpylen)
649 cpy = srcbufcpylen;
651 memcpy(dst, srcbufcpystart, cpy);
653 dst += cpy;
654 len -= cpy;
656 cn_lx->data_buf.read_remaining -= cpy;
657 cn_lx->data_buf.next_read_offset += cpy;
659 if (cpy == srcbufcpylen)
660 cor_databuf_nextreadchunk(cn_lx);
664 void cor_databuf_unpull_dpi(struct cor_conn *trgt_sock, struct cor_sock *cs,
665 struct cor_data_buf_item *item, __u16 next_read_offset)
667 BUG_ON(next_read_offset > item->datalen);
669 if (next_read_offset >= item->datalen)
670 goto free;
672 spin_lock_bh(&(trgt_sock->rcv_lock));
674 if (unlikely(cor_is_trgt_sock(trgt_sock, cs) == 0)) {
675 spin_unlock_bh(&(trgt_sock->rcv_lock));
676 goto free;
679 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
680 &(trgt_sock->data_buf.nextread->buf_list) !=
681 trgt_sock->data_buf.items.next);
682 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
684 trgt_sock->data_buf.first_offset -= item->datalen;
685 trgt_sock->data_buf.datasize += item->datalen;
686 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
688 if (item->type == DATABUF_BUF) {
689 trgt_sock->data_buf.overhead +=
690 sizeof(struct cor_data_buf_item) +
691 item->buflen - item->datalen;
692 } else if (item->type == DATABUF_SKB) {
693 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
694 } else {
695 BUG();
698 list_add(&(item->buf_list), &(trgt_sock->data_buf.items));
700 trgt_sock->data_buf.nextread = item;
701 trgt_sock->data_buf.next_read_offset = next_read_offset;
703 cor_account_bufspace(trgt_sock);
705 spin_unlock_bh(&(trgt_sock->rcv_lock));
707 if (0) {
708 free:
709 cor_databuf_item_free(item);
713 void cor_databuf_pull_dbi(struct cor_sock *cs_rl, struct cor_conn *trgt_sock_l)
715 struct cor_data_buf_item *dbi = 0;
716 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
717 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
719 if (trgt_sock_l->data_buf.read_remaining == 0)
720 return;
722 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
723 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
724 trgt_sock_l->data_buf.nextread->datalen);
725 dbi = trgt_sock_l->data_buf.nextread;
727 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
729 cs_rl->data.conn_raw.rcvitem = dbi;
730 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
732 trgt_sock_l->data_buf.first_offset += dbi->datalen;
733 trgt_sock_l->data_buf.datasize -= dbi->datalen;
734 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
736 cor_account_bufspace(trgt_sock_l);
738 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
739 cor_databuf_nextreadchunk(trgt_sock_l);
741 cor_databuf_item_unlink(trgt_sock_l, dbi);
744 void cor_databuf_unpull(struct cor_conn *trgt_out_l, __u32 bytes)
746 trgt_out_l->data_buf.read_remaining += bytes;
748 BUG_ON(list_empty(&(trgt_out_l->data_buf.items)) != 0);
750 if (trgt_out_l->data_buf.nextread == 0) {
751 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
753 trgt_out_l->data_buf.nextread = container_of(
754 trgt_out_l->data_buf.items.prev,
755 struct cor_data_buf_item, buf_list);
758 while (bytes > trgt_out_l->data_buf.next_read_offset) {
759 bytes -= trgt_out_l->data_buf.next_read_offset;
760 trgt_out_l->data_buf.nextread = container_of(
761 trgt_out_l->data_buf.nextread->buf_list.prev,
762 struct cor_data_buf_item, buf_list);
763 BUG_ON(&(trgt_out_l->data_buf.nextread->buf_list) ==
764 &(trgt_out_l->data_buf.items));
765 trgt_out_l->data_buf.next_read_offset =
766 trgt_out_l->data_buf.nextread->datalen;
769 trgt_out_l->data_buf.next_read_offset -= bytes;
772 void cor_databuf_pullold(struct cor_conn *trgt_out_l, __u64 startpos, char *dst,
773 int len)
775 __u64 pos = trgt_out_l->data_buf.first_offset;
776 struct cor_data_buf_item *dbi = container_of(
777 trgt_out_l->data_buf.items.next,
778 struct cor_data_buf_item, buf_list);
780 while (1) {
781 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
783 if (cor_seqno_after(pos + dbi->datalen, startpos))
784 break;
786 pos += dbi->datalen;
787 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
788 buf_list);
791 while (len > 0) {
792 int cpy = len;
794 char *srcbufcpystart = 0;
795 int srcbufcpylen = 0;
797 __u64 offset = cor_seqno_clean(startpos - pos);
799 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
801 BUG_ON(cor_seqno_before(startpos, pos));
802 BUG_ON(offset > dbi->datalen);
804 srcbufcpystart = dbi->buf + offset;
805 srcbufcpylen = dbi->datalen - offset;
807 if (cpy > srcbufcpylen)
808 cpy = srcbufcpylen;
810 memcpy(dst, srcbufcpystart, cpy);
812 dst += cpy;
813 len -= cpy;
814 startpos += cpy;
816 pos += dbi->datalen;
817 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
818 buf_list);
822 /* ack up to *not* including pos */
823 void cor_databuf_ack(struct cor_conn *trgt_out_l, __u64 pos)
825 __u32 acked = 0;
827 while (!list_empty(&(trgt_out_l->data_buf.items))) {
828 struct cor_data_buf_item *firstitem = container_of(
829 trgt_out_l->data_buf.items.next,
830 struct cor_data_buf_item, buf_list);
832 if (firstitem == trgt_out_l->data_buf.nextread)
833 break;
835 if (cor_seqno_after_eq(trgt_out_l->data_buf.first_offset +
836 firstitem->datalen, pos))
837 break;
839 trgt_out_l->data_buf.first_offset += firstitem->datalen;
840 acked += firstitem->datalen;
842 cor_databuf_item_unlink(trgt_out_l, firstitem);
843 cor_databuf_item_free(firstitem);
846 trgt_out_l->data_buf.datasize -= acked;
848 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
849 trgt_out_l->data_buf.overhead != 0);
851 if (acked != 0)
852 cor_account_bufspace(trgt_out_l);
855 void cor_databuf_ackread(struct cor_conn *cn_lx)
857 __u32 acked = 0;
859 while (!list_empty(&(cn_lx->data_buf.items))) {
860 struct cor_data_buf_item *firstitem = container_of(
861 cn_lx->data_buf.items.next,
862 struct cor_data_buf_item, buf_list);
864 if (firstitem == cn_lx->data_buf.nextread)
865 break;
867 acked += firstitem->datalen;
869 cor_databuf_item_unlink(cn_lx, firstitem);
870 cor_databuf_item_free(firstitem);
873 cn_lx->data_buf.datasize -= acked;
874 cn_lx->data_buf.first_offset += acked;
876 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
878 if (acked != 0)
879 cor_account_bufspace(cn_lx);
882 __u32 cor_receive_buf(struct cor_conn *cn_lx, char *buf, __u32 datalen,
883 int rcv_delayed_lowbuf, __u8 flush)
885 struct cor_data_buf_item *item = 0;
887 __u32 totalcpy = 0;
889 if (list_empty(&(cn_lx->data_buf.items)) == 0) {
890 struct list_head *last = cn_lx->data_buf.items.prev;
891 item = container_of(last, struct cor_data_buf_item, buf_list);
894 while (datalen > 0) {
895 __u32 cpy = datalen;
897 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
898 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
900 if (item == 0 || item->type != DATABUF_BUF ||
901 item->buflen <= item->datalen) {
902 item = kmem_cache_alloc(cor_data_buf_item_slab,
903 GFP_ATOMIC);
904 if (unlikely(item == 0))
905 break;
907 memset(item, 0, sizeof(struct cor_data_buf_item));
908 item->type = DATABUF_BUF;
910 item->buflen = cor_buf_optlen(datalen);
911 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
913 if (unlikely(item->buf == 0)) {
914 kmem_cache_free(cor_data_buf_item_slab, item);
915 break;
917 item->datalen = 0;
919 list_add_tail(&(item->buf_list),
920 &(cn_lx->data_buf.items));
922 cn_lx->data_buf.overhead += item->buflen +
923 sizeof(struct cor_data_buf_item);
926 BUG_ON(item->type != DATABUF_BUF);
927 BUG_ON(item->buflen <= item->datalen);
929 if (cn_lx->data_buf.nextread == 0) {
930 cn_lx->data_buf.nextread = item;
931 cn_lx->data_buf.next_read_offset = item->datalen;
934 if (item->buflen - item->datalen < cpy)
935 cpy = (item->buflen - item->datalen);
937 memcpy(item->buf + item->datalen, buf, cpy);
938 item->datalen += cpy;
940 BUG_ON(cpy > datalen);
941 buf += cpy;
942 datalen -= cpy;
943 totalcpy += cpy;
945 cn_lx->data_buf.read_remaining += cpy;
946 cn_lx->data_buf.datasize += cpy;
947 cn_lx->data_buf.overhead -= cpy;
948 BUG_ON(cn_lx->data_buf.datasize != 0 &&
949 cn_lx->data_buf.overhead == 0);
952 if (datalen != 0)
953 flush = 0;
954 cn_lx->flush = flush;
956 cor_account_bufspace(cn_lx);
957 cor_bufsize_update(cn_lx, totalcpy, rcv_delayed_lowbuf, flush);
959 return totalcpy;
962 __u32 cor_receive_skb(struct cor_conn *src_in_l, struct sk_buff *skb,
963 int rcv_delayed_lowbuf, __u8 flush)
965 struct cor_skb_procstate *ps = cor_skb_pstate(skb);
966 struct cor_data_buf_item *item = &(ps->funcstate.rcv.dbi);
968 __u32 bufferleft = 0;
970 BUG_ON(skb->len <= 0);
972 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
973 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
974 (1 << 30))))
975 return 0;
977 if (list_empty(&(src_in_l->data_buf.items)) == 0) {
978 struct list_head *last = src_in_l->data_buf.items.prev;
979 struct cor_data_buf_item *item = container_of(last,
980 struct cor_data_buf_item, buf_list);
981 bufferleft = item->buflen - item->datalen;
984 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
985 __u32 rc = cor_receive_buf(src_in_l, skb->data, skb->len,
986 rcv_delayed_lowbuf, flush);
987 if (likely(rc == skb->len))
988 kfree_skb(skb);
989 return rc;
992 memset(item, 0, sizeof(struct cor_data_buf_item));
994 item->type = DATABUF_SKB;
995 item->buf = skb->data;
996 item->datalen = skb->len;
997 item->buflen = item->datalen;
998 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
999 if (src_in_l->data_buf.nextread == 0)
1000 src_in_l->data_buf.nextread = item;
1002 src_in_l->data_buf.read_remaining += item->datalen;
1003 src_in_l->data_buf.datasize += item->datalen;
1004 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
1006 cor_account_bufspace(src_in_l);
1007 cor_bufsize_update(src_in_l, skb->len, rcv_delayed_lowbuf, flush);
1009 src_in_l->flush = flush;
1011 return skb->len;
1014 void cor_wake_sender(struct cor_conn *cn)
1016 spin_lock_bh(&(cn->rcv_lock));
1018 if (unlikely(cn->isreset)) {
1019 spin_unlock_bh(&(cn->rcv_lock));
1020 return;
1023 switch (cn->sourcetype) {
1024 case SOURCE_UNCONNECTED:
1025 spin_unlock_bh(&(cn->rcv_lock));
1026 spin_lock_bh(&(cn->reversedir->rcv_lock));
1027 if (likely(cn->reversedir->isreset == 0 &&
1028 cn->reversedir->targettype ==
1029 TARGET_UNCONNECTED))
1030 cor_proc_cpacket(cn->reversedir);
1031 spin_unlock_bh(&(cn->reversedir->rcv_lock));
1032 break;
1033 case SOURCE_SOCK:
1034 if (_cor_mngdsocket_flushtoconn(cn) == RC_FTC_OK &&
1035 cn->source.sock.cs != 0 &&
1036 cor_sock_sndbufavailable(cn, 1))
1037 cor_sk_write_space(cn->source.sock.cs);
1038 spin_unlock_bh(&(cn->rcv_lock));
1039 break;
1040 case SOURCE_IN:
1041 cor_drain_ooo_queue(cn);
1042 if (likely(cn->source.in.established != 0)) {
1043 cor_send_ack_conn_ifneeded(cn, 0, 0);
1045 spin_unlock_bh(&(cn->rcv_lock));
1046 break;
1047 default:
1048 BUG();
1052 int __init cor_forward_init(void)
1054 cor_data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
1055 sizeof(struct cor_data_buf_item), 8, 0, 0);
1056 if (unlikely(cor_data_buf_item_slab == 0))
1057 return -ENOMEM;
1059 atomic64_set(&cor_bufused_sum, 0);
1061 return 0;
1064 void __exit cor_forward_exit2(void)
1066 BUG_ON(atomic64_read(&cor_bufused_sum) != 0);
1068 kmem_cache_destroy(cor_data_buf_item_slab);
1069 cor_data_buf_item_slab = 0;
1072 MODULE_LICENSE("GPL");