move some stuff from common.c to neighbor.c
[cor.git] / net / cor / forward.c
blob06ca6af1511aff3b82fd2819a3a009b0303ea42d
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *cor_data_buf_item_slab;
27 atomic64_t cor_bufused_sum;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&cor_bufused_sum);
32 } */
34 void cor_databuf_init(struct cor_conn *cn_init)
36 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
37 INIT_LIST_HEAD(&(cn_init->data_buf.items));
40 void cor_bufsize_init(struct cor_conn *cn_l, __u32 bufsize)
42 __u32 bufsize_shifted;
44 memset(&(cn_l->bufsize), 0, sizeof(cn_l->bufsize));
46 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
47 bufsize_shifted = U32_MAX;
48 else
49 bufsize_shifted = bufsize << 5;
51 cn_l->bufsize.bufsize = bufsize_shifted;
52 cn_l->bufsize.state = BUFSIZE_NOACTION;
53 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
56 #warning todo reset stalled conns on low memory
57 /**
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int cor_account_bufspace(struct cor_conn *cn_lx)
63 __u64 space_needed = 0;
64 __u32 space_req;
65 __u64 bufused_sum_int;
67 if (likely(cn_lx->isreset == 0)) {
68 space_needed += cn_lx->data_buf.datasize;
69 space_needed += cn_lx->data_buf.overhead;
70 if (cn_lx->sourcetype == SOURCE_IN) {
71 space_needed += cn_lx->source.in.reorder_memused;
75 if (cn_lx->bufspace_accounted == space_needed) {
76 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
77 return 1;
78 } else if (atomic64_read(&cor_bufused_sum) >=
79 BUFUSAGE_GLOBAL_MAX) {
80 return 1;
81 } else {
82 return 0;
86 if (unlikely(space_needed >= U32_MAX))
87 space_req = U32_MAX;
88 else
89 space_req = space_needed;
91 if (cn_lx->bufspace_accounted == space_req)
92 return 1;
94 bufused_sum_int = cor_update_atomic_sum(&cor_bufused_sum,
95 cn_lx->bufspace_accounted, space_req);
97 cn_lx->bufspace_accounted = space_req;
99 if (space_needed != space_req)
100 return 1;
101 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
102 return 1;
103 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
104 return 1;
105 else
106 return 0;
109 int cor_cpacket_write_allowed(struct cor_conn *src_unconn_lx)
111 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
113 if (src_unconn_lx->data_buf.datasize == 0)
114 return 1;
115 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_CPACKETS &&
116 cor_account_bufspace(src_unconn_lx) == 0)
117 return 1;
118 else
119 return 0;
122 void cor_update_windowlimit(struct cor_conn *src_in_lx)
124 __u32 bufsize;
126 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
128 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
130 if (src_in_lx->targettype != TARGET_OUT) {
131 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
132 cor_account_bufspace(src_in_lx)) {
133 bufsize = WINDOW_MAX_PER_CONN_MIN;
135 } else if (cor_seqno_before_eq(src_in_lx->target.out.seqno_windowlimit,
136 src_in_lx->target.out.seqno_nextsend)) {
137 if (cor_account_bufspace(src_in_lx)) {
138 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
140 } else {
141 __u32 windowleft = (__u32) min((__u64) U32_MAX,
142 cor_seqno_clean(
143 src_in_lx->target.out.seqno_windowlimit -
144 src_in_lx->target.out.seqno_nextsend));
146 bufsize = max(bufsize, min(windowleft,
147 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
149 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
150 cor_account_bufspace(src_in_lx)) {
151 bufsize = WINDOW_MAX_PER_CONN_MIN;
155 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
156 bufsize = WINDOW_MAX_PER_CONN_MAX;
158 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
160 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
161 bufsize = 0;
162 else
163 bufsize -= src_in_lx->data_buf.read_remaining;
165 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
166 bufsize = 0;
168 src_in_lx->source.in.window_seqnolimit =
169 src_in_lx->source.in.next_seqno + bufsize;
172 static int cor_bufsize_high_latency_sender(struct cor_conn *cn_lx)
174 struct cor_neighbor *nb;
176 __u64 latency_us;
178 if (cn_lx->sourcetype != SOURCE_IN)
179 return 0;
181 nb = cn_lx->source.in.nb;
182 if (unlikely(nb == 0))
183 return 0;
185 latency_us = atomic_read(&(nb->latency_retrans_us));
186 latency_us += atomic_read(&(nb->latency_stddev_retrans_us));
187 latency_us += CMSG_MAXDELAY_ACKCONN_MS * 1000;
189 return latency_us > 100000 ? 1 : 0;
192 static void _cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
193 int high_latency_sender, int high_latency_conn)
195 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
196 BUG_ON(BUFSIZE_SHIFT != 5);
198 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
199 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
200 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
201 return;
204 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
205 cn_lx->bufsize.state = BUFSIZE_DECR;
206 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
209 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
210 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
211 __u8 speed = 1;
212 __u32 change;
214 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
215 speed *= 2;
217 if (high_latency_sender)
218 speed *= 2;
220 change = rcvd * speed;
221 if (high_latency_conn)
222 change /= 2;
224 if (cn_lx->bufsize.bufsize < change)
225 cn_lx->bufsize.bufsize = 0;
226 else
227 cn_lx->bufsize.bufsize -= change;
229 if (cn_lx->bufsize.act.decr.size_start/4 >
230 cn_lx->bufsize.bufsize)
231 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
232 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
233 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
234 __u8 speed = 1;
235 __u32 change;
237 if (high_latency_sender)
238 speed *= 2;
240 if (cn_lx->bufsize.bytes_rcvd != (1 << 24) - 1 &&
241 cn_lx->bufsize.bytes_rcvd <
242 cn_lx->bufsize.bufsize)
243 speed *= 4;
244 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
245 speed *= 2;
247 change = rcvd * speed;
248 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
249 cn_lx->bufsize.bufsize = U32_MAX;
250 else
251 cn_lx->bufsize.bufsize += change;
253 if (cn_lx->bufsize.bufsize >=
254 cn_lx->bufsize.act.incr.size_end) {
255 cn_lx->bufsize.bufsize =
256 cn_lx->bufsize.act.incr.size_end;
257 cn_lx->bufsize.state = BUFSIZE_NOACTION;
258 if (high_latency_conn) {
259 cn_lx->bufsize.act.noact.bytesleft =
260 (cn_lx->bufsize.bufsize >>
261 (BUFSIZE_SHIFT-3));
262 } else {
263 cn_lx->bufsize.act.noact.bytesleft =
264 (cn_lx->bufsize.bufsize >>
265 (BUFSIZE_SHIFT-2));
268 } else {
269 BUG();
272 if (unlikely(rcvd >= (1 << 24)) ||
273 cn_lx->bufsize.bytes_rcvd + rcvd >= (1 << 24))
274 cn_lx->bufsize.bytes_rcvd = (1 << 24) - 1;
275 else
276 cn_lx->bufsize.bytes_rcvd += rcvd;
279 static __u32 cor_get_read_remaining_min(__u32 bufsize_bytes,
280 int high_latency_sender, int high_latency_conn)
282 int bufspace_low = (atomic64_read(&cor_bufused_sum) >=
283 3*(BUFUSAGE_GLOBAL_MAX/4));
285 if (high_latency_conn) {
286 if (high_latency_sender) {
287 if (bufspace_low) {
288 return bufsize_bytes/6 + 1;
289 } else {
290 return bufsize_bytes/3 + 1;
292 } else {
293 if (bufspace_low) {
294 return bufsize_bytes/8 + 1;
295 } else {
296 return bufsize_bytes/4 + 1;
299 } else {
300 if (high_latency_sender) {
301 if (bufspace_low) {
302 return bufsize_bytes/6 + 1;
303 } else {
304 return bufsize_bytes/4 + 1;
306 } else {
307 if (bufspace_low) {
308 return bufsize_bytes/12 + 1;
309 } else {
310 return bufsize_bytes/8 + 1;
316 static void cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
317 int rcv_delayed_lowbuf, __u8 rcv_flushrcvd)
319 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
320 int high_latency_conn = (cn_lx->is_highlatency != 0);
321 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
322 __u32 read_remaining_min = cor_get_read_remaining_min(bufsize_bytes,
323 high_latency_sender, high_latency_conn);
324 __u32 read_remaining_min_nofastdecr = read_remaining_min*2;
325 __u32 read_remaining_min_nodecr = (read_remaining_min +
326 read_remaining_min_nofastdecr)/2;
327 __u32 read_remaining = cn_lx->data_buf.read_remaining - rcvd;
329 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
331 if (rcv_flushrcvd != 0) {
332 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
333 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
334 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
335 cn_lx->bufsize.state = BUFSIZE_NOACTION;
336 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
337 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
338 cn_lx->bufsize.act.noact.bytesleft <
339 bytesleft) {
340 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
344 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
345 rcv_delayed_lowbuf = 0;
346 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
347 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
348 else
349 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
352 if (rcv_delayed_lowbuf && read_remaining < read_remaining_min) {
353 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
354 __u32 buf_increase;
355 __u32 bufsize_end;
357 if (high_latency_sender) {
358 if (buf_increase_bytes < rcvd/16)
359 buf_increase_bytes = rcvd/16;
360 } else {
361 if (buf_increase_bytes < rcvd/32)
362 buf_increase_bytes = rcvd/32;
365 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
366 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
367 buf_increase_bytes))
368 buf_increase = U32_MAX;
370 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
371 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
372 bufsize_end = U32_MAX;
374 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
375 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
376 cn_lx->bufsize.state = BUFSIZE_INCR;
377 cn_lx->bufsize.act.incr.size_start =
378 cn_lx->bufsize.bufsize;
379 cn_lx->bufsize.act.incr.size_end = 0;
382 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
383 cn_lx->bufsize.act.incr.size_end = bufsize_end;
384 if (bufsize_end/4 > cn_lx->bufsize.act.incr.size_start)
385 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
386 } else if (rcv_delayed_lowbuf &&
387 read_remaining < read_remaining_min_nodecr) {
388 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
389 cn_lx->bufsize.state == BUFSIZE_DECR ||
390 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
391 __u32 bytesleft = 0;
392 __u8 rtt_mul = 2;
393 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
394 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
395 if (high_latency_conn)
396 rtt_mul *= 2;
397 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
398 rtt_mul > bytesleft))
399 bytesleft = (cn_lx->bufsize.bufsize >>
400 BUFSIZE_SHIFT) * rtt_mul;
402 cn_lx->bufsize.state = BUFSIZE_NOACTION;
403 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
405 } else if (rcv_delayed_lowbuf &&
406 read_remaining < read_remaining_min_nofastdecr) {
407 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
408 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
409 cn_lx->bufsize.state = BUFSIZE_DECR;
410 cn_lx->bufsize.act.decr.size_start =
411 cn_lx->bufsize.bufsize;
415 _cor_bufsize_update(cn_lx, rcvd, high_latency_sender,
416 high_latency_conn);
419 void cor_bufsize_read_to_sock(struct cor_conn *trgt_sock_lx)
421 unsigned long jiffies_tmp = jiffies;
422 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
423 HZ/10 : HZ/40);
425 if (trgt_sock_lx->target.sock.waiting_for_userspace != 0 && time_before(
426 trgt_sock_lx->target.sock.waiting_for_userspace_since,
427 jiffies - latency_limit)) {
428 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
429 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
432 if (trgt_sock_lx->data_buf.read_remaining == 0) {
433 trgt_sock_lx->target.sock.waiting_for_userspace = 0;
434 } else {
435 trgt_sock_lx->target.sock.waiting_for_userspace = 1;
436 trgt_sock_lx->target.sock.waiting_for_userspace_since =
437 jiffies_tmp;
441 static inline void cor_databuf_item_unlink(struct cor_conn *cn_lx,
442 struct cor_data_buf_item *item)
444 BUG_ON(item == cn_lx->data_buf.nextread);
445 list_del(&(item->buf_list));
446 if (item->type == DATABUF_BUF) {
447 cn_lx->data_buf.overhead -= sizeof(struct cor_data_buf_item) +
448 item->buflen - item->datalen;
449 } else if (item->type == DATABUF_SKB) {
450 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
451 } else {
452 BUG();
456 void cor_databuf_ackdiscard(struct cor_conn *cn_lx)
458 __u32 freed = 0;
460 cn_lx->data_buf.next_read_offset = 0;
461 cn_lx->data_buf.nextread = 0;
463 while (!list_empty(&(cn_lx->data_buf.items))) {
464 struct cor_data_buf_item *item = container_of(
465 cn_lx->data_buf.items.next,
466 struct cor_data_buf_item, buf_list);
467 freed += item->datalen;
469 cor_databuf_item_unlink(cn_lx, item);
470 cor_databuf_item_free(item);
473 cn_lx->data_buf.datasize -= freed;
474 cn_lx->data_buf.first_offset += freed;
476 BUG_ON(cn_lx->data_buf.datasize != 0);
477 BUG_ON(cn_lx->data_buf.overhead != 0);
479 cn_lx->data_buf.read_remaining = 0;
482 void cor_reset_seqno(struct cor_conn *cn_l, __u64 initseqno)
484 cn_l->data_buf.first_offset = initseqno -
485 cn_l->data_buf.datasize +
486 cn_l->data_buf.read_remaining;
489 static void cor_databuf_nextreadchunk(struct cor_conn *cn_lx)
491 BUG_ON(cn_lx->data_buf.nextread == 0);
492 BUG_ON(cn_lx->data_buf.next_read_offset !=
493 cn_lx->data_buf.nextread->datalen);
495 if (&(cn_lx->data_buf.nextread->buf_list) ==
496 cn_lx->data_buf.items.prev) {
497 BUG_ON(cn_lx->data_buf.read_remaining != 0);
498 cn_lx->data_buf.nextread = 0;
500 } else {
501 BUG_ON(cn_lx->data_buf.read_remaining == 0);
502 cn_lx->data_buf.nextread = container_of(
503 cn_lx->data_buf.nextread->buf_list.next,
504 struct cor_data_buf_item, buf_list);
507 cn_lx->data_buf.next_read_offset = 0;
510 void cor_databuf_pull(struct cor_conn *cn_lx, char *dst, __u32 len)
512 BUG_ON(cn_lx->data_buf.read_remaining < len);
514 while (len > 0) {
515 int cpy = len;
517 char *srcbufcpystart = 0;
518 int srcbufcpylen = 0;
520 BUG_ON(cn_lx->data_buf.nextread == 0);
521 BUG_ON(cn_lx->data_buf.next_read_offset >=
522 cn_lx->data_buf.nextread->datalen);
524 srcbufcpystart = cn_lx->data_buf.nextread->buf +
525 cn_lx->data_buf.next_read_offset;
526 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
527 cn_lx->data_buf.next_read_offset;
529 if (cpy > srcbufcpylen)
530 cpy = srcbufcpylen;
532 memcpy(dst, srcbufcpystart, cpy);
534 dst += cpy;
535 len -= cpy;
537 cn_lx->data_buf.read_remaining -= cpy;
538 cn_lx->data_buf.next_read_offset += cpy;
540 if (cpy == srcbufcpylen)
541 cor_databuf_nextreadchunk(cn_lx);
545 void cor_databuf_unpull_dpi(struct cor_conn *trgt_sock, struct cor_sock *cs,
546 struct cor_data_buf_item *item, __u16 next_read_offset)
548 BUG_ON(next_read_offset > item->datalen);
550 if (next_read_offset >= item->datalen)
551 goto free;
553 spin_lock_bh(&(trgt_sock->rcv_lock));
555 if (unlikely(cor_is_trgt_sock(trgt_sock, cs) == 0)) {
556 spin_unlock_bh(&(trgt_sock->rcv_lock));
557 goto free;
560 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
561 &(trgt_sock->data_buf.nextread->buf_list) !=
562 trgt_sock->data_buf.items.next);
563 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
565 trgt_sock->data_buf.first_offset -= item->datalen;
566 trgt_sock->data_buf.datasize += item->datalen;
567 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
569 if (item->type == DATABUF_BUF) {
570 trgt_sock->data_buf.overhead +=
571 sizeof(struct cor_data_buf_item) +
572 item->buflen - item->datalen;
573 } else if (item->type == DATABUF_SKB) {
574 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
575 } else {
576 BUG();
579 list_add(&(item->buf_list), &(trgt_sock->data_buf.items));
581 trgt_sock->data_buf.nextread = item;
582 trgt_sock->data_buf.next_read_offset = next_read_offset;
584 cor_account_bufspace(trgt_sock);
586 spin_unlock_bh(&(trgt_sock->rcv_lock));
588 if (0) {
589 free:
590 cor_databuf_item_free(item);
594 void cor_databuf_pull_dbi(struct cor_sock *cs_rl, struct cor_conn *trgt_sock_l)
596 struct cor_data_buf_item *dbi = 0;
597 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
598 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
600 if (trgt_sock_l->data_buf.read_remaining == 0)
601 return;
603 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
604 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
605 trgt_sock_l->data_buf.nextread->datalen);
606 dbi = trgt_sock_l->data_buf.nextread;
608 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
610 cs_rl->data.conn_raw.rcvitem = dbi;
611 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
613 trgt_sock_l->data_buf.first_offset += dbi->datalen;
614 trgt_sock_l->data_buf.datasize -= dbi->datalen;
615 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
617 cor_account_bufspace(trgt_sock_l);
619 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
620 cor_databuf_nextreadchunk(trgt_sock_l);
622 cor_databuf_item_unlink(trgt_sock_l, dbi);
625 void cor_databuf_unpull(struct cor_conn *trgt_out_l, __u32 bytes)
627 trgt_out_l->data_buf.read_remaining += bytes;
629 BUG_ON(list_empty(&(trgt_out_l->data_buf.items)) != 0);
631 if (trgt_out_l->data_buf.nextread == 0) {
632 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
634 trgt_out_l->data_buf.nextread = container_of(
635 trgt_out_l->data_buf.items.prev,
636 struct cor_data_buf_item, buf_list);
639 while (bytes > trgt_out_l->data_buf.next_read_offset) {
640 bytes -= trgt_out_l->data_buf.next_read_offset;
641 trgt_out_l->data_buf.nextread = container_of(
642 trgt_out_l->data_buf.nextread->buf_list.prev,
643 struct cor_data_buf_item, buf_list);
644 BUG_ON(&(trgt_out_l->data_buf.nextread->buf_list) ==
645 &(trgt_out_l->data_buf.items));
646 trgt_out_l->data_buf.next_read_offset =
647 trgt_out_l->data_buf.nextread->datalen;
650 trgt_out_l->data_buf.next_read_offset -= bytes;
653 void cor_databuf_pullold(struct cor_conn *trgt_out_l, __u64 startpos, char *dst,
654 int len)
656 __u64 pos = trgt_out_l->data_buf.first_offset;
657 struct cor_data_buf_item *dbi = container_of(
658 trgt_out_l->data_buf.items.next,
659 struct cor_data_buf_item, buf_list);
661 while (1) {
662 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
664 if (cor_seqno_after(pos + dbi->datalen, startpos))
665 break;
667 pos += dbi->datalen;
668 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
669 buf_list);
672 while (len > 0) {
673 int cpy = len;
675 char *srcbufcpystart = 0;
676 int srcbufcpylen = 0;
678 __u64 offset = cor_seqno_clean(startpos - pos);
680 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
682 BUG_ON(cor_seqno_before(startpos, pos));
683 BUG_ON(offset > dbi->datalen);
685 srcbufcpystart = dbi->buf + offset;
686 srcbufcpylen = dbi->datalen - offset;
688 if (cpy > srcbufcpylen)
689 cpy = srcbufcpylen;
691 memcpy(dst, srcbufcpystart, cpy);
693 dst += cpy;
694 len -= cpy;
695 startpos += cpy;
697 pos += dbi->datalen;
698 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
699 buf_list);
703 /* ack up to *not* including pos */
704 void cor_databuf_ack(struct cor_conn *trgt_out_l, __u64 pos)
706 __u32 acked = 0;
708 while (!list_empty(&(trgt_out_l->data_buf.items))) {
709 struct cor_data_buf_item *firstitem = container_of(
710 trgt_out_l->data_buf.items.next,
711 struct cor_data_buf_item, buf_list);
713 if (firstitem == trgt_out_l->data_buf.nextread)
714 break;
716 if (cor_seqno_after_eq(trgt_out_l->data_buf.first_offset +
717 firstitem->datalen, pos))
718 break;
720 trgt_out_l->data_buf.first_offset += firstitem->datalen;
721 acked += firstitem->datalen;
723 cor_databuf_item_unlink(trgt_out_l, firstitem);
724 cor_databuf_item_free(firstitem);
727 trgt_out_l->data_buf.datasize -= acked;
729 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
730 trgt_out_l->data_buf.overhead != 0);
732 if (acked != 0)
733 cor_account_bufspace(trgt_out_l);
736 void cor_databuf_ackread(struct cor_conn *cn_lx)
738 __u32 acked = 0;
740 while (!list_empty(&(cn_lx->data_buf.items))) {
741 struct cor_data_buf_item *firstitem = container_of(
742 cn_lx->data_buf.items.next,
743 struct cor_data_buf_item, buf_list);
745 if (firstitem == cn_lx->data_buf.nextread)
746 break;
748 acked += firstitem->datalen;
750 cor_databuf_item_unlink(cn_lx, firstitem);
751 cor_databuf_item_free(firstitem);
754 cn_lx->data_buf.datasize -= acked;
755 cn_lx->data_buf.first_offset += acked;
757 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
759 if (acked != 0)
760 cor_account_bufspace(cn_lx);
763 __u32 cor_receive_buf(struct cor_conn *cn_lx, char *buf, __u32 datalen,
764 int rcv_delayed_lowbuf, __u8 flush)
766 struct cor_data_buf_item *item = 0;
768 __u32 totalcpy = 0;
770 if (list_empty(&(cn_lx->data_buf.items)) == 0) {
771 struct list_head *last = cn_lx->data_buf.items.prev;
772 item = container_of(last, struct cor_data_buf_item, buf_list);
775 while (datalen > 0) {
776 __u32 cpy = datalen;
778 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
779 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
781 if (item == 0 || item->type != DATABUF_BUF ||
782 item->buflen <= item->datalen) {
783 item = kmem_cache_alloc(cor_data_buf_item_slab,
784 GFP_ATOMIC);
785 if (unlikely(item == 0))
786 break;
788 memset(item, 0, sizeof(struct cor_data_buf_item));
789 item->type = DATABUF_BUF;
791 item->buflen = cor_buf_optlen(datalen);
792 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
794 if (unlikely(item->buf == 0)) {
795 kmem_cache_free(cor_data_buf_item_slab, item);
796 break;
798 item->datalen = 0;
800 list_add_tail(&(item->buf_list),
801 &(cn_lx->data_buf.items));
803 cn_lx->data_buf.overhead += item->buflen +
804 sizeof(struct cor_data_buf_item);
807 BUG_ON(item->type != DATABUF_BUF);
808 BUG_ON(item->buflen <= item->datalen);
810 if (cn_lx->data_buf.nextread == 0) {
811 cn_lx->data_buf.nextread = item;
812 cn_lx->data_buf.next_read_offset = item->datalen;
815 if (item->buflen - item->datalen < cpy)
816 cpy = (item->buflen - item->datalen);
818 memcpy(item->buf + item->datalen, buf, cpy);
819 item->datalen += cpy;
821 BUG_ON(cpy > datalen);
822 buf += cpy;
823 datalen -= cpy;
824 totalcpy += cpy;
826 cn_lx->data_buf.read_remaining += cpy;
827 cn_lx->data_buf.datasize += cpy;
828 cn_lx->data_buf.overhead -= cpy;
829 BUG_ON(cn_lx->data_buf.datasize != 0 &&
830 cn_lx->data_buf.overhead == 0);
833 if (datalen != 0)
834 flush = 0;
835 cn_lx->flush = flush;
837 cor_account_bufspace(cn_lx);
838 cor_bufsize_update(cn_lx, totalcpy, rcv_delayed_lowbuf, flush);
840 return totalcpy;
843 __u32 cor_receive_skb(struct cor_conn *src_in_l, struct sk_buff *skb,
844 int rcv_delayed_lowbuf, __u8 flush)
846 struct cor_skb_procstate *ps = cor_skb_pstate(skb);
847 struct cor_data_buf_item *item = &(ps->funcstate.rcv.dbi);
849 __u32 bufferleft = 0;
851 BUG_ON(skb->len <= 0);
853 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
854 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
855 (1 << 30))))
856 return 0;
858 if (list_empty(&(src_in_l->data_buf.items)) == 0) {
859 struct list_head *last = src_in_l->data_buf.items.prev;
860 struct cor_data_buf_item *item = container_of(last,
861 struct cor_data_buf_item, buf_list);
862 bufferleft = item->buflen - item->datalen;
865 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
866 __u32 rc = cor_receive_buf(src_in_l, skb->data, skb->len,
867 rcv_delayed_lowbuf, flush);
868 if (likely(rc == skb->len))
869 kfree_skb(skb);
870 return rc;
873 memset(item, 0, sizeof(struct cor_data_buf_item));
875 item->type = DATABUF_SKB;
876 item->buf = skb->data;
877 item->datalen = skb->len;
878 item->buflen = item->datalen;
879 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
880 if (src_in_l->data_buf.nextread == 0)
881 src_in_l->data_buf.nextread = item;
883 src_in_l->data_buf.read_remaining += item->datalen;
884 src_in_l->data_buf.datasize += item->datalen;
885 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
887 cor_account_bufspace(src_in_l);
888 cor_bufsize_update(src_in_l, skb->len, rcv_delayed_lowbuf, flush);
890 src_in_l->flush = flush;
892 return skb->len;
895 void cor_wake_sender(struct cor_conn *cn)
897 spin_lock_bh(&(cn->rcv_lock));
899 if (unlikely(cn->isreset)) {
900 spin_unlock_bh(&(cn->rcv_lock));
901 return;
904 switch (cn->sourcetype) {
905 case SOURCE_UNCONNECTED:
906 spin_unlock_bh(&(cn->rcv_lock));
907 spin_lock_bh(&(cn->reversedir->rcv_lock));
908 if (likely(cn->reversedir->isreset == 0 &&
909 cn->reversedir->targettype ==
910 TARGET_UNCONNECTED))
911 cor_proc_cpacket(cn->reversedir);
912 spin_unlock_bh(&(cn->reversedir->rcv_lock));
913 break;
914 case SOURCE_SOCK:
915 if (_cor_mngdsocket_flushtoconn(cn) == RC_FTC_OK &&
916 cn->source.sock.cs != 0 /* &&
917 cor_sock_sndbufavailable(cn) */)
918 cor_sk_write_space(cn->source.sock.cs);
919 spin_unlock_bh(&(cn->rcv_lock));
920 break;
921 case SOURCE_IN:
922 cor_drain_ooo_queue(cn);
923 if (likely(cn->source.in.established != 0)) {
924 cor_send_ack_conn_ifneeded(cn, 0, 0);
926 spin_unlock_bh(&(cn->rcv_lock));
927 break;
928 default:
929 BUG();
933 int __init cor_forward_init(void)
935 cor_data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
936 sizeof(struct cor_data_buf_item), 8, 0, 0);
937 if (unlikely(cor_data_buf_item_slab == 0))
938 return -ENOMEM;
940 atomic64_set(&cor_bufused_sum, 0);
942 return 0;
945 void __exit cor_forward_exit2(void)
947 BUG_ON(atomic64_read(&cor_bufused_sum) != 0);
949 kmem_cache_destroy(cor_data_buf_item_slab);
950 cor_data_buf_item_slab = 0;
953 MODULE_LICENSE("GPL");