kpacket_gen: use constants for cmdlength
[cor.git] / net / cor / forward.c
blobab9d47f48d1ad4b168aba6fa350461b8c452ed2b
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2019 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 atomic64_t bufused_sum;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&bufused_sum);
32 } */
34 void databuf_init(struct conn *cn_init)
36 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
37 INIT_LIST_HEAD(&(cn_init->data_buf.items));
40 void bufsize_init(struct conn *cn_l, __u32 bufsize)
42 __u32 bufsize_shifted;
44 memset(&(cn_l->bufsize), 0, sizeof(cn_l->bufsize));
46 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
47 bufsize_shifted = U32_MAX;
48 else
49 bufsize_shifted = bufsize << 5;
51 cn_l->bufsize.bufsize = bufsize_shifted;
52 cn_l->bufsize.state = BUFSIZE_NOACTION;
53 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
56 #warning todo reset stalled conns on low memory
57 /**
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int account_bufspace(struct conn *cn_lx)
63 __u64 space_needed = 0;
64 __u32 space_req;
65 __u64 bufused_sum_int;
67 if (likely(cn_lx->isreset == 0)) {
68 space_needed += cn_lx->data_buf.datasize;
69 space_needed += cn_lx->data_buf.overhead;
70 if (cn_lx->sourcetype == SOURCE_IN) {
71 space_needed += cn_lx->source.in.reorder_memused;
75 if (cn_lx->bufspace_accounted == space_needed) {
76 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
77 return 1;
78 } else if (atomic64_read(&bufused_sum) >= BUFUSAGE_GLOBAL_MAX) {
79 return 1;
80 } else {
81 return 0;
85 if (unlikely(space_needed >= U32_MAX))
86 space_req = U32_MAX;
87 else
88 space_req = space_needed;
90 if (cn_lx->bufspace_accounted == space_req)
91 return 1;
93 bufused_sum_int = update_atomic_sum(&bufused_sum,
94 cn_lx->bufspace_accounted, space_req);
96 cn_lx->bufspace_accounted = space_req;
98 if (space_needed != space_req)
99 return 1;
100 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
101 return 1;
102 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
103 return 1;
104 else
105 return 0;
108 int cpacket_write_allowed(struct conn *src_unconn_lx)
110 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
112 if (src_unconn_lx->data_buf.datasize == 0)
113 return 1;
114 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_CPACKETS &&
115 account_bufspace(src_unconn_lx) == 0)
116 return 1;
117 else
118 return 0;
121 void update_windowlimit(struct conn *src_in_lx)
123 __u32 bufsize;
125 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
127 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
129 if (src_in_lx->targettype != TARGET_OUT) {
130 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
131 account_bufspace(src_in_lx)) {
132 bufsize = WINDOW_MAX_PER_CONN_MIN;
134 } else if (seqno_before_eq(src_in_lx->target.out.seqno_windowlimit,
135 src_in_lx->target.out.seqno_nextsend)) {
136 if (account_bufspace(src_in_lx)) {
137 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
139 } else {
140 __u32 windowleft = (__u32) min((__u64) U32_MAX,
141 seqno_clean(
142 src_in_lx->target.out.seqno_windowlimit -
143 src_in_lx->target.out.seqno_nextsend));
145 bufsize = max(bufsize, min(windowleft,
146 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
148 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
149 account_bufspace(src_in_lx)) {
150 bufsize = WINDOW_MAX_PER_CONN_MIN;
154 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
155 bufsize = WINDOW_MAX_PER_CONN_MAX;
157 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
159 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
160 bufsize = 0;
161 else
162 bufsize -= src_in_lx->data_buf.read_remaining;
164 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
165 bufsize = 0;
167 src_in_lx->source.in.window_seqnolimit =
168 src_in_lx->source.in.next_seqno + bufsize;
171 static int bufsize_high_latency_sender(struct conn *cn_lx)
173 struct neighbor *nb;
175 __u64 latency_us;
177 if (cn_lx->sourcetype != SOURCE_IN)
178 return 0;
180 nb = cn_lx->source.in.nb;
181 if (unlikely(nb == 0))
182 return 0;
184 latency_us = atomic_read(&(nb->latency_retrans_us));
185 latency_us += atomic_read(&(nb->latency_stddev_retrans_us));
186 latency_us += CMSG_MAXDELAY_ACKCONN_MS * 1000;
188 return latency_us > 100000 ? 1 : 0;
191 static void _bufsize_update(struct conn *cn_lx, __u32 rcvd,
192 int high_latency_sender, int high_latency_conn)
194 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
195 BUG_ON(BUFSIZE_SHIFT != 5);
197 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
198 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
199 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
200 return;
203 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
204 cn_lx->bufsize.state = BUFSIZE_DECR;
205 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
208 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
209 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
210 __u8 speed = 1;
211 __u32 change;
213 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
214 speed *= 2;
216 if (high_latency_sender)
217 speed *= 2;
219 change = rcvd * speed;
220 if (high_latency_conn)
221 change /= 2;
223 if (cn_lx->bufsize.bufsize < change)
224 cn_lx->bufsize.bufsize = 0;
225 else
226 cn_lx->bufsize.bufsize -= change;
228 if (cn_lx->bufsize.act.decr.size_start/4 >
229 cn_lx->bufsize.bufsize)
230 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
231 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
232 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
233 __u8 speed = 1;
234 __u32 change;
236 if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
237 speed *= 2;
239 if (high_latency_sender)
240 speed *= 2;
242 change = rcvd * speed;
243 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
244 cn_lx->bufsize.bufsize = U32_MAX;
245 else
246 cn_lx->bufsize.bufsize += change;
248 if (cn_lx->bufsize.bufsize >=
249 cn_lx->bufsize.act.incr.size_end) {
250 cn_lx->bufsize.bufsize =
251 cn_lx->bufsize.act.incr.size_end;
252 cn_lx->bufsize.state = BUFSIZE_NOACTION;
253 cn_lx->bufsize.act.noact.bytesleft =
254 (cn_lx->bufsize.bufsize >>
255 (BUFSIZE_SHIFT-2));
257 } else {
258 BUG();
262 static __u32 get_read_remaining_min(__u32 bufsize_bytes,
263 int high_latency_sender, int high_latency_conn)
265 int bufspace_low = (atomic64_read(&bufused_sum) >=
266 3*(BUFUSAGE_GLOBAL_MAX/4));
268 if (high_latency_conn) {
269 if (high_latency_sender) {
270 if (bufspace_low) {
271 return bufsize_bytes/6 + 1;
272 } else {
273 return bufsize_bytes/3 + 1;
275 } else {
276 if (bufspace_low) {
277 return bufsize_bytes/8 + 1;
278 } else {
279 return bufsize_bytes/4 + 1;
282 } else {
283 if (high_latency_sender) {
284 if (bufspace_low) {
285 return bufsize_bytes/6 + 1;
286 } else {
287 return bufsize_bytes/4 + 1;
289 } else {
290 if (bufspace_low) {
291 return bufsize_bytes/12 + 1;
292 } else {
293 return bufsize_bytes/8 + 1;
299 static void bufsize_update(struct conn *cn_lx, __u32 rcvd,
300 int rcv_delayed_lowbuf, __u8 rcv_flushrcvd)
302 int high_latency_sender = bufsize_high_latency_sender(cn_lx);
303 int high_latency_conn = (cn_lx->is_highlatency != 0);
304 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
305 __u32 read_remaining_min = get_read_remaining_min(bufsize_bytes,
306 high_latency_sender, high_latency_conn);
307 __u32 read_remaining_min_nofastdecr = read_remaining_min*2;
308 __u32 read_remaining_min_nodecr = (read_remaining_min +
309 read_remaining_min_nofastdecr)/2;
310 __u32 read_remaining = cn_lx->data_buf.read_remaining - rcvd;
312 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
314 if (rcv_flushrcvd != 0) {
315 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
316 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
317 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
318 cn_lx->bufsize.state = BUFSIZE_NOACTION;
319 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
320 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
321 cn_lx->bufsize.act.noact.bytesleft <
322 bytesleft) {
323 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
327 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
328 rcv_delayed_lowbuf = 0;
329 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
330 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
331 else
332 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
335 if (rcv_delayed_lowbuf && read_remaining < read_remaining_min) {
336 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
337 __u32 buf_increase;
338 __u32 bufsize_end;
340 if (high_latency_sender) {
341 if (buf_increase_bytes < rcvd/16)
342 buf_increase_bytes = rcvd/16;
343 } else {
344 if (buf_increase_bytes < rcvd/32)
345 buf_increase_bytes = rcvd/32;
348 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
349 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
350 buf_increase_bytes))
351 buf_increase = U32_MAX;
353 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
354 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
355 bufsize_end = U32_MAX;
357 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
358 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
359 cn_lx->bufsize.state = BUFSIZE_INCR;
360 cn_lx->bufsize.act.incr.size_start =
361 cn_lx->bufsize.bufsize;
362 cn_lx->bufsize.act.incr.size_end = 0;
365 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
366 cn_lx->bufsize.act.incr.size_end = bufsize_end;
367 if (bufsize_end/4 > cn_lx->bufsize.act.incr.size_start)
368 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
369 } else if (rcv_delayed_lowbuf &&
370 read_remaining < read_remaining_min_nodecr) {
371 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
372 cn_lx->bufsize.state == BUFSIZE_DECR ||
373 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
374 __u32 bytesleft = 0;
375 __u8 rtt_mul = 2;
376 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
377 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
378 if (high_latency_conn)
379 rtt_mul *= 2;
380 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
381 rtt_mul > bytesleft))
382 bytesleft = (cn_lx->bufsize.bufsize >>
383 BUFSIZE_SHIFT) * rtt_mul;
385 cn_lx->bufsize.state = BUFSIZE_NOACTION;
386 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
388 } else if (rcv_delayed_lowbuf &&
389 read_remaining < read_remaining_min_nofastdecr) {
390 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
391 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
392 cn_lx->bufsize.state = BUFSIZE_DECR;
393 cn_lx->bufsize.act.decr.size_start =
394 cn_lx->bufsize.bufsize;
398 _bufsize_update(cn_lx, rcvd, high_latency_sender, high_latency_conn);
401 void bufsize_read_to_sock(struct conn *trgt_sock_lx)
403 unsigned long jiffies_tmp = jiffies;
404 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
405 HZ/10 : HZ/40);
407 if (trgt_sock_lx->target.sock.waiting_for_userspace != 0 && time_before(
408 trgt_sock_lx->target.sock.waiting_for_userspace_since,
409 jiffies - latency_limit)) {
410 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
411 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
414 if (trgt_sock_lx->data_buf.read_remaining == 0) {
415 trgt_sock_lx->target.sock.waiting_for_userspace = 0;
416 } else {
417 trgt_sock_lx->target.sock.waiting_for_userspace = 1;
418 trgt_sock_lx->target.sock.waiting_for_userspace_since =
419 jiffies_tmp;
423 static inline void databuf_item_unlink(struct conn *cn_lx,
424 struct data_buf_item *item)
426 BUG_ON(item == cn_lx->data_buf.nextread);
427 list_del(&(item->buf_list));
428 if (item->type == DATABUF_BUF) {
429 cn_lx->data_buf.overhead -= sizeof(struct data_buf_item) +
430 item->buflen - item->datalen;
431 } else if (item->type == DATABUF_SKB) {
432 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
433 } else {
434 BUG();
438 void databuf_ackdiscard(struct conn *cn_lx)
440 __u32 freed = 0;
442 cn_lx->data_buf.next_read_offset = 0;
443 cn_lx->data_buf.nextread = 0;
445 while (!list_empty(&(cn_lx->data_buf.items))) {
446 struct data_buf_item *item = container_of(
447 cn_lx->data_buf.items.next,
448 struct data_buf_item, buf_list);
449 freed += item->datalen;
451 databuf_item_unlink(cn_lx, item);
452 databuf_item_free(item);
455 cn_lx->data_buf.datasize -= freed;
456 cn_lx->data_buf.first_offset += freed;
458 BUG_ON(cn_lx->data_buf.datasize != 0);
459 BUG_ON(cn_lx->data_buf.overhead != 0);
461 cn_lx->data_buf.read_remaining = 0;
464 void reset_seqno(struct conn *cn_l, __u64 initseqno)
466 cn_l->data_buf.first_offset = initseqno -
467 cn_l->data_buf.datasize +
468 cn_l->data_buf.read_remaining;
471 static void databuf_nextreadchunk(struct conn *cn_lx)
473 BUG_ON(cn_lx->data_buf.nextread == 0);
474 BUG_ON(cn_lx->data_buf.next_read_offset !=
475 cn_lx->data_buf.nextread->datalen);
477 if (&(cn_lx->data_buf.nextread->buf_list) ==
478 cn_lx->data_buf.items.prev) {
479 BUG_ON(cn_lx->data_buf.read_remaining != 0);
480 cn_lx->data_buf.nextread = 0;
482 } else {
483 BUG_ON(cn_lx->data_buf.read_remaining == 0);
484 cn_lx->data_buf.nextread = container_of(
485 cn_lx->data_buf.nextread->buf_list.next,
486 struct data_buf_item, buf_list);
489 cn_lx->data_buf.next_read_offset = 0;
492 void databuf_pull(struct conn *cn_lx, char *dst, __u32 len)
494 BUG_ON(cn_lx->data_buf.read_remaining < len);
496 while (len > 0) {
497 int cpy = len;
499 char *srcbufcpystart = 0;
500 int srcbufcpylen = 0;
502 BUG_ON(cn_lx->data_buf.nextread == 0);
503 BUG_ON(cn_lx->data_buf.next_read_offset >=
504 cn_lx->data_buf.nextread->datalen);
506 srcbufcpystart = cn_lx->data_buf.nextread->buf +
507 cn_lx->data_buf.next_read_offset;
508 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
509 cn_lx->data_buf.next_read_offset;
511 if (cpy > srcbufcpylen)
512 cpy = srcbufcpylen;
514 memcpy(dst, srcbufcpystart, cpy);
516 dst += cpy;
517 len -= cpy;
519 cn_lx->data_buf.read_remaining -= cpy;
520 cn_lx->data_buf.next_read_offset += cpy;
522 if (cpy == srcbufcpylen)
523 databuf_nextreadchunk(cn_lx);
527 void databuf_unpull_dpi(struct conn *trgt_sock, struct cor_sock *cs,
528 struct data_buf_item *item, __u16 next_read_offset)
530 BUG_ON(next_read_offset > item->datalen);
532 if (next_read_offset >= item->datalen)
533 goto free;
535 spin_lock_bh(&(trgt_sock->rcv_lock));
537 if (unlikely(is_trgt_sock(trgt_sock, cs) == 0)) {
538 spin_unlock_bh(&(trgt_sock->rcv_lock));
539 goto free;
542 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
543 &(trgt_sock->data_buf.nextread->buf_list) !=
544 trgt_sock->data_buf.items.next);
545 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
547 trgt_sock->data_buf.first_offset -= item->datalen;
548 trgt_sock->data_buf.datasize += item->datalen;
549 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
551 if (item->type == DATABUF_BUF) {
552 trgt_sock->data_buf.overhead += sizeof(struct data_buf_item) +
553 item->buflen - item->datalen;
554 } else if (item->type == DATABUF_SKB) {
555 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
556 } else {
557 BUG();
560 list_add(&(item->buf_list), &(trgt_sock->data_buf.items));
562 trgt_sock->data_buf.nextread = item;
563 trgt_sock->data_buf.next_read_offset = next_read_offset;
565 account_bufspace(trgt_sock);
567 spin_unlock_bh(&(trgt_sock->rcv_lock));
569 if (0) {
570 free:
571 databuf_item_free(item);
575 void databuf_pull_dbi(struct cor_sock *cs_rl, struct conn *trgt_sock_l)
577 struct data_buf_item *dbi = 0;
578 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
579 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
581 if (trgt_sock_l->data_buf.read_remaining == 0)
582 return;
584 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
585 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
586 trgt_sock_l->data_buf.nextread->datalen);
587 dbi = trgt_sock_l->data_buf.nextread;
589 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
591 cs_rl->data.conn_raw.rcvitem = dbi;
592 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
594 trgt_sock_l->data_buf.first_offset += dbi->datalen;
595 trgt_sock_l->data_buf.datasize -= dbi->datalen;
596 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
598 account_bufspace(trgt_sock_l);
600 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
601 databuf_nextreadchunk(trgt_sock_l);
603 databuf_item_unlink(trgt_sock_l, dbi);
606 void databuf_unpull(struct conn *trgt_out_l, __u32 bytes)
608 trgt_out_l->data_buf.read_remaining += bytes;
610 BUG_ON(list_empty(&(trgt_out_l->data_buf.items)) != 0);
612 if (trgt_out_l->data_buf.nextread == 0) {
613 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
615 trgt_out_l->data_buf.nextread = container_of(
616 trgt_out_l->data_buf.items.prev,
617 struct data_buf_item, buf_list);
620 while (bytes > trgt_out_l->data_buf.next_read_offset) {
621 bytes -= trgt_out_l->data_buf.next_read_offset;
622 trgt_out_l->data_buf.nextread = container_of(
623 trgt_out_l->data_buf.nextread->buf_list.prev,
624 struct data_buf_item, buf_list);
625 BUG_ON(&(trgt_out_l->data_buf.nextread->buf_list) ==
626 &(trgt_out_l->data_buf.items));
627 trgt_out_l->data_buf.next_read_offset =
628 trgt_out_l->data_buf.nextread->datalen;
631 trgt_out_l->data_buf.next_read_offset -= bytes;
634 void databuf_pullold(struct conn *trgt_out_l, __u64 startpos, char *dst,
635 int len)
637 __u64 pos = trgt_out_l->data_buf.first_offset;
638 struct data_buf_item *dbi = container_of(
639 trgt_out_l->data_buf.items.next,
640 struct data_buf_item, buf_list);
642 while (1) {
643 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
645 if (seqno_after(pos + dbi->datalen, startpos))
646 break;
648 pos += dbi->datalen;
649 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
650 buf_list);
653 while (len > 0) {
654 int cpy = len;
656 char *srcbufcpystart = 0;
657 int srcbufcpylen = 0;
659 __u64 offset = seqno_clean(startpos - pos);
661 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
663 BUG_ON(seqno_before(startpos, pos));
664 BUG_ON(offset > dbi->datalen);
666 srcbufcpystart = dbi->buf + offset;
667 srcbufcpylen = dbi->datalen - offset;
669 if (cpy > srcbufcpylen)
670 cpy = srcbufcpylen;
672 memcpy(dst, srcbufcpystart, cpy);
674 dst += cpy;
675 len -= cpy;
676 startpos += cpy;
678 pos += dbi->datalen;
679 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
680 buf_list);
684 /* ack up to *not* including pos */
685 void databuf_ack(struct conn *trgt_out_l, __u64 pos)
687 __u32 acked = 0;
689 while (!list_empty(&(trgt_out_l->data_buf.items))) {
690 struct data_buf_item *firstitem = container_of(
691 trgt_out_l->data_buf.items.next,
692 struct data_buf_item, buf_list);
694 if (firstitem == trgt_out_l->data_buf.nextread)
695 break;
697 if (seqno_after_eq(trgt_out_l->data_buf.first_offset +
698 firstitem->datalen, pos))
699 break;
701 trgt_out_l->data_buf.first_offset += firstitem->datalen;
702 acked += firstitem->datalen;
704 databuf_item_unlink(trgt_out_l, firstitem);
705 databuf_item_free(firstitem);
708 trgt_out_l->data_buf.datasize -= acked;
710 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
711 trgt_out_l->data_buf.overhead != 0);
713 if (acked != 0)
714 account_bufspace(trgt_out_l);
717 void databuf_ackread(struct conn *cn_lx)
719 __u32 acked = 0;
721 while (!list_empty(&(cn_lx->data_buf.items))) {
722 struct data_buf_item *firstitem = container_of(
723 cn_lx->data_buf.items.next,
724 struct data_buf_item, buf_list);
726 if (firstitem == cn_lx->data_buf.nextread)
727 break;
729 acked += firstitem->datalen;
731 databuf_item_unlink(cn_lx, firstitem);
732 databuf_item_free(firstitem);
735 cn_lx->data_buf.datasize -= acked;
736 cn_lx->data_buf.first_offset += acked;
738 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
740 if (acked != 0)
741 account_bufspace(cn_lx);
744 __u32 receive_buf(struct conn *cn_lx, char *buf, __u32 datalen,
745 int rcv_delayed_lowbuf, __u8 flush)
747 struct data_buf_item *item = 0;
749 __u32 totalcpy = 0;
751 if (list_empty(&(cn_lx->data_buf.items)) == 0) {
752 struct list_head *last = cn_lx->data_buf.items.prev;
753 item = container_of(last, struct data_buf_item, buf_list);
756 while (datalen > 0) {
757 __u32 cpy = datalen;
759 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
760 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
762 if (item == 0 || item->type != DATABUF_BUF ||
763 item->buflen <= item->datalen) {
764 item = kmem_cache_alloc(data_buf_item_slab, GFP_ATOMIC);
765 if (unlikely(item == 0))
766 break;
768 memset(item, 0, sizeof(struct data_buf_item));
769 item->type = DATABUF_BUF;
771 item->buflen = buf_optlen(datalen);
772 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
774 if (unlikely(item->buf == 0)) {
775 kmem_cache_free(data_buf_item_slab, item);
776 break;
778 item->datalen = 0;
780 list_add_tail(&(item->buf_list),
781 &(cn_lx->data_buf.items));
783 cn_lx->data_buf.overhead += item->buflen +
784 sizeof(struct data_buf_item);
787 BUG_ON(item->type != DATABUF_BUF);
788 BUG_ON(item->buflen <= item->datalen);
790 if (cn_lx->data_buf.nextread == 0) {
791 cn_lx->data_buf.nextread = item;
792 cn_lx->data_buf.next_read_offset = item->datalen;
795 if (item->buflen - item->datalen < cpy)
796 cpy = (item->buflen - item->datalen);
798 memcpy(item->buf + item->datalen, buf, cpy);
799 item->datalen += cpy;
801 BUG_ON(cpy > datalen);
802 buf += cpy;
803 datalen -= cpy;
804 totalcpy += cpy;
806 cn_lx->data_buf.read_remaining += cpy;
807 cn_lx->data_buf.datasize += cpy;
808 cn_lx->data_buf.overhead -= cpy;
809 BUG_ON(cn_lx->data_buf.datasize != 0 &&
810 cn_lx->data_buf.overhead == 0);
813 if (datalen != 0)
814 flush = 0;
815 cn_lx->flush = flush;
817 account_bufspace(cn_lx);
818 bufsize_update(cn_lx, totalcpy, rcv_delayed_lowbuf, flush);
820 return totalcpy;
823 __u32 receive_skb(struct conn *src_in_l, struct sk_buff *skb,
824 int rcv_delayed_lowbuf, __u8 flush)
826 struct skb_procstate *ps = skb_pstate(skb);
827 struct data_buf_item *item = &(ps->funcstate.rcv.dbi);
829 __u32 bufferleft = 0;
831 BUG_ON(skb->len <= 0);
833 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
834 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
835 (1 << 30))))
836 return 0;
838 if (list_empty(&(src_in_l->data_buf.items)) == 0) {
839 struct list_head *last = src_in_l->data_buf.items.prev;
840 struct data_buf_item *item = container_of(last,
841 struct data_buf_item, buf_list);
842 bufferleft = item->buflen - item->datalen;
845 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
846 __u32 rc = receive_buf(src_in_l, skb->data, skb->len,
847 rcv_delayed_lowbuf, flush);
848 if (likely(rc == skb->len))
849 kfree_skb(skb);
850 return rc;
853 memset(item, 0, sizeof(struct data_buf_item));
855 item->type = DATABUF_SKB;
856 item->buf = skb->data;
857 item->datalen = skb->len;
858 item->buflen = item->datalen;
859 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
860 if (src_in_l->data_buf.nextread == 0)
861 src_in_l->data_buf.nextread = item;
863 src_in_l->data_buf.read_remaining += item->datalen;
864 src_in_l->data_buf.datasize += item->datalen;
865 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
867 account_bufspace(src_in_l);
868 bufsize_update(src_in_l, skb->len, rcv_delayed_lowbuf, flush);
870 src_in_l->flush = flush;
872 return skb->len;
875 void wake_sender(struct conn *cn)
877 spin_lock_bh(&(cn->rcv_lock));
879 if (unlikely(cn->isreset)) {
880 spin_unlock_bh(&(cn->rcv_lock));
881 return;
884 switch (cn->sourcetype) {
885 case SOURCE_UNCONNECTED:
886 spin_unlock_bh(&(cn->rcv_lock));
887 spin_lock_bh(&(cn->reversedir->rcv_lock));
888 if (likely(cn->reversedir->isreset == 0 &&
889 cn->reversedir->targettype ==
890 TARGET_UNCONNECTED))
891 proc_cpacket(cn->reversedir);
892 spin_unlock_bh(&(cn->reversedir->rcv_lock));
893 break;
894 case SOURCE_SOCK:
895 if (cn->source.sock.cs != 0 /*&& cor_sock_sndbufavailable(cn)*/)
896 cor_sk_write_space(cn->source.sock.cs);
897 spin_unlock_bh(&(cn->rcv_lock));
898 break;
899 case SOURCE_IN:
900 drain_ooo_queue(cn);
901 if (likely(cn->source.in.established != 0)) {
902 send_ack_conn_ifneeded(cn, 0, 0);
904 spin_unlock_bh(&(cn->rcv_lock));
905 break;
906 default:
907 BUG();
911 int __init forward_init(void)
913 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
914 sizeof(struct data_buf_item), 8, 0, 0);
915 if (unlikely(data_buf_item_slab == 0))
916 return -ENOMEM;
918 atomic64_set(&bufused_sum, 0);
920 return 0;
923 MODULE_LICENSE("GPL");