faster bufsize increase on new connections
[cor.git] / net / cor / forward.c
blob157f111432109e66ff3e2b77fb7979610f6a0b0b
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2019 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 atomic64_t bufused_sum;
29 /* __u64 get_bufspace_used(void)
31 return atomic64_read(&bufused_sum);
32 } */
34 void databuf_init(struct conn *cn_init)
36 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
37 INIT_LIST_HEAD(&(cn_init->data_buf.items));
40 void bufsize_init(struct conn *cn_l, __u32 bufsize)
42 __u32 bufsize_shifted;
44 memset(&(cn_l->bufsize), 0, sizeof(cn_l->bufsize));
46 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
47 bufsize_shifted = U32_MAX;
48 else
49 bufsize_shifted = bufsize << 5;
51 cn_l->bufsize.bufsize = bufsize_shifted;
52 cn_l->bufsize.state = BUFSIZE_NOACTION;
53 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
56 #warning todo reset stalled conns on low memory
57 /**
58 * create a list of all connections with target_out and read_remaining >=
59 * window*2, reset connection which has been in this list longest
61 int account_bufspace(struct conn *cn_lx)
63 __u64 space_needed = 0;
64 __u32 space_req;
65 __u64 bufused_sum_int;
67 if (likely(cn_lx->isreset == 0)) {
68 space_needed += cn_lx->data_buf.datasize;
69 space_needed += cn_lx->data_buf.overhead;
70 if (cn_lx->sourcetype == SOURCE_IN) {
71 space_needed += cn_lx->source.in.reorder_memused;
75 if (cn_lx->bufspace_accounted == space_needed) {
76 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
77 return 1;
78 } else if (atomic64_read(&bufused_sum) >= BUFUSAGE_GLOBAL_MAX) {
79 return 1;
80 } else {
81 return 0;
85 if (unlikely(space_needed >= U32_MAX))
86 space_req = U32_MAX;
87 else
88 space_req = space_needed;
90 if (cn_lx->bufspace_accounted == space_req)
91 return 1;
93 bufused_sum_int = update_atomic_sum(&bufused_sum,
94 cn_lx->bufspace_accounted, space_req);
96 cn_lx->bufspace_accounted = space_req;
98 if (space_needed != space_req)
99 return 1;
100 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
101 return 1;
102 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
103 return 1;
104 else
105 return 0;
108 int cpacket_write_allowed(struct conn *src_unconn_lx)
110 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
112 if (src_unconn_lx->data_buf.datasize == 0)
113 return 1;
114 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_CPACKETS &&
115 account_bufspace(src_unconn_lx) == 0)
116 return 1;
117 else
118 return 0;
121 void update_windowlimit(struct conn *src_in_lx)
123 __u32 bufsize;
125 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
127 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
129 if (src_in_lx->targettype != TARGET_OUT) {
130 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
131 account_bufspace(src_in_lx)) {
132 bufsize = WINDOW_MAX_PER_CONN_MIN;
134 } else if (seqno_before_eq(src_in_lx->target.out.seqno_windowlimit,
135 src_in_lx->target.out.seqno_nextsend)) {
136 if (account_bufspace(src_in_lx)) {
137 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
139 } else {
140 __u32 windowleft = (__u32) min((__u64) U32_MAX,
141 seqno_clean(
142 src_in_lx->target.out.seqno_windowlimit -
143 src_in_lx->target.out.seqno_nextsend));
145 bufsize = max(bufsize, min(windowleft,
146 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
148 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
149 account_bufspace(src_in_lx)) {
150 bufsize = WINDOW_MAX_PER_CONN_MIN;
154 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
155 bufsize = WINDOW_MAX_PER_CONN_MAX;
157 /* printk(KERN_ERR "window %p %u %u", src_in_lx, bufsize, src_in_lx->data_buf.read_remaining); */
159 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
160 bufsize = 0;
161 else
162 bufsize -= src_in_lx->data_buf.read_remaining;
164 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
165 bufsize = 0;
167 src_in_lx->source.in.window_seqnolimit =
168 src_in_lx->source.in.next_seqno + bufsize;
171 static int bufsize_high_latency_sender(struct conn *cn_lx)
173 struct neighbor *nb;
175 __u64 latency_us;
177 if (cn_lx->sourcetype != SOURCE_IN)
178 return 0;
180 nb = cn_lx->source.in.nb;
181 if (unlikely(nb == 0))
182 return 0;
184 latency_us = atomic_read(&(nb->latency_retrans_us));
185 latency_us += atomic_read(&(nb->latency_stddev_retrans_us));
186 latency_us += CMSG_MAXDELAY_ACKCONN_MS * 1000;
188 return latency_us > 100000 ? 1 : 0;
191 static void _bufsize_update(struct conn *cn_lx, __u32 rcvd,
192 int high_latency_sender, int high_latency_conn)
194 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
195 BUG_ON(BUFSIZE_SHIFT != 5);
197 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
198 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
199 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
200 return;
203 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
204 cn_lx->bufsize.state = BUFSIZE_DECR;
205 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
208 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
209 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
210 __u8 speed = 1;
211 __u32 change;
213 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
214 speed *= 2;
216 if (high_latency_sender)
217 speed *= 2;
219 change = rcvd * speed;
220 if (high_latency_conn)
221 change /= 2;
223 if (cn_lx->bufsize.bufsize < change)
224 cn_lx->bufsize.bufsize = 0;
225 else
226 cn_lx->bufsize.bufsize -= change;
228 if (cn_lx->bufsize.act.decr.size_start/4 >
229 cn_lx->bufsize.bufsize)
230 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
231 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
232 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
233 __u8 speed = 1;
234 __u32 change;
236 if (high_latency_sender)
237 speed *= 2;
239 if (cn_lx->bufsize.bytes_rcvd != (1 << 24) - 1 &&
240 cn_lx->bufsize.bytes_rcvd <
241 cn_lx->bufsize.bufsize)
242 speed *= 4;
243 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
244 speed *= 2;
246 change = rcvd * speed;
247 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
248 cn_lx->bufsize.bufsize = U32_MAX;
249 else
250 cn_lx->bufsize.bufsize += change;
252 if (cn_lx->bufsize.bufsize >=
253 cn_lx->bufsize.act.incr.size_end) {
254 cn_lx->bufsize.bufsize =
255 cn_lx->bufsize.act.incr.size_end;
256 cn_lx->bufsize.state = BUFSIZE_NOACTION;
257 if (high_latency_conn) {
258 cn_lx->bufsize.act.noact.bytesleft =
259 (cn_lx->bufsize.bufsize >>
260 (BUFSIZE_SHIFT-3));
261 } else {
262 cn_lx->bufsize.act.noact.bytesleft =
263 (cn_lx->bufsize.bufsize >>
264 (BUFSIZE_SHIFT-2));
267 } else {
268 BUG();
271 if (unlikely(rcvd >= (1 << 24)) ||
272 cn_lx->bufsize.bytes_rcvd + rcvd >= (1 << 24))
273 cn_lx->bufsize.bytes_rcvd = (1 << 24) - 1;
274 else
275 cn_lx->bufsize.bytes_rcvd += rcvd;
278 static __u32 get_read_remaining_min(__u32 bufsize_bytes,
279 int high_latency_sender, int high_latency_conn)
281 int bufspace_low = (atomic64_read(&bufused_sum) >=
282 3*(BUFUSAGE_GLOBAL_MAX/4));
284 if (high_latency_conn) {
285 if (high_latency_sender) {
286 if (bufspace_low) {
287 return bufsize_bytes/6 + 1;
288 } else {
289 return bufsize_bytes/3 + 1;
291 } else {
292 if (bufspace_low) {
293 return bufsize_bytes/8 + 1;
294 } else {
295 return bufsize_bytes/4 + 1;
298 } else {
299 if (high_latency_sender) {
300 if (bufspace_low) {
301 return bufsize_bytes/6 + 1;
302 } else {
303 return bufsize_bytes/4 + 1;
305 } else {
306 if (bufspace_low) {
307 return bufsize_bytes/12 + 1;
308 } else {
309 return bufsize_bytes/8 + 1;
315 static void bufsize_update(struct conn *cn_lx, __u32 rcvd,
316 int rcv_delayed_lowbuf, __u8 rcv_flushrcvd)
318 int high_latency_sender = bufsize_high_latency_sender(cn_lx);
319 int high_latency_conn = (cn_lx->is_highlatency != 0);
320 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
321 __u32 read_remaining_min = get_read_remaining_min(bufsize_bytes,
322 high_latency_sender, high_latency_conn);
323 __u32 read_remaining_min_nofastdecr = read_remaining_min*2;
324 __u32 read_remaining_min_nodecr = (read_remaining_min +
325 read_remaining_min_nofastdecr)/2;
326 __u32 read_remaining = cn_lx->data_buf.read_remaining - rcvd;
328 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
330 if (rcv_flushrcvd != 0) {
331 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
332 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
333 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
334 cn_lx->bufsize.state = BUFSIZE_NOACTION;
335 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
336 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
337 cn_lx->bufsize.act.noact.bytesleft <
338 bytesleft) {
339 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
343 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
344 rcv_delayed_lowbuf = 0;
345 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
346 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
347 else
348 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
351 if (rcv_delayed_lowbuf && read_remaining < read_remaining_min) {
352 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
353 __u32 buf_increase;
354 __u32 bufsize_end;
356 if (high_latency_sender) {
357 if (buf_increase_bytes < rcvd/16)
358 buf_increase_bytes = rcvd/16;
359 } else {
360 if (buf_increase_bytes < rcvd/32)
361 buf_increase_bytes = rcvd/32;
364 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
365 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
366 buf_increase_bytes))
367 buf_increase = U32_MAX;
369 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
370 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
371 bufsize_end = U32_MAX;
373 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
374 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
375 cn_lx->bufsize.state = BUFSIZE_INCR;
376 cn_lx->bufsize.act.incr.size_start =
377 cn_lx->bufsize.bufsize;
378 cn_lx->bufsize.act.incr.size_end = 0;
381 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
382 cn_lx->bufsize.act.incr.size_end = bufsize_end;
383 if (bufsize_end/4 > cn_lx->bufsize.act.incr.size_start)
384 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
385 } else if (rcv_delayed_lowbuf &&
386 read_remaining < read_remaining_min_nodecr) {
387 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
388 cn_lx->bufsize.state == BUFSIZE_DECR ||
389 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
390 __u32 bytesleft = 0;
391 __u8 rtt_mul = 2;
392 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
393 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
394 if (high_latency_conn)
395 rtt_mul *= 2;
396 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
397 rtt_mul > bytesleft))
398 bytesleft = (cn_lx->bufsize.bufsize >>
399 BUFSIZE_SHIFT) * rtt_mul;
401 cn_lx->bufsize.state = BUFSIZE_NOACTION;
402 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
404 } else if (rcv_delayed_lowbuf &&
405 read_remaining < read_remaining_min_nofastdecr) {
406 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
407 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
408 cn_lx->bufsize.state = BUFSIZE_DECR;
409 cn_lx->bufsize.act.decr.size_start =
410 cn_lx->bufsize.bufsize;
414 _bufsize_update(cn_lx, rcvd, high_latency_sender, high_latency_conn);
417 void bufsize_read_to_sock(struct conn *trgt_sock_lx)
419 unsigned long jiffies_tmp = jiffies;
420 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
421 HZ/10 : HZ/40);
423 if (trgt_sock_lx->target.sock.waiting_for_userspace != 0 && time_before(
424 trgt_sock_lx->target.sock.waiting_for_userspace_since,
425 jiffies - latency_limit)) {
426 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
427 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
430 if (trgt_sock_lx->data_buf.read_remaining == 0) {
431 trgt_sock_lx->target.sock.waiting_for_userspace = 0;
432 } else {
433 trgt_sock_lx->target.sock.waiting_for_userspace = 1;
434 trgt_sock_lx->target.sock.waiting_for_userspace_since =
435 jiffies_tmp;
439 static inline void databuf_item_unlink(struct conn *cn_lx,
440 struct data_buf_item *item)
442 BUG_ON(item == cn_lx->data_buf.nextread);
443 list_del(&(item->buf_list));
444 if (item->type == DATABUF_BUF) {
445 cn_lx->data_buf.overhead -= sizeof(struct data_buf_item) +
446 item->buflen - item->datalen;
447 } else if (item->type == DATABUF_SKB) {
448 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
449 } else {
450 BUG();
454 void databuf_ackdiscard(struct conn *cn_lx)
456 __u32 freed = 0;
458 cn_lx->data_buf.next_read_offset = 0;
459 cn_lx->data_buf.nextread = 0;
461 while (!list_empty(&(cn_lx->data_buf.items))) {
462 struct data_buf_item *item = container_of(
463 cn_lx->data_buf.items.next,
464 struct data_buf_item, buf_list);
465 freed += item->datalen;
467 databuf_item_unlink(cn_lx, item);
468 databuf_item_free(item);
471 cn_lx->data_buf.datasize -= freed;
472 cn_lx->data_buf.first_offset += freed;
474 BUG_ON(cn_lx->data_buf.datasize != 0);
475 BUG_ON(cn_lx->data_buf.overhead != 0);
477 cn_lx->data_buf.read_remaining = 0;
480 void reset_seqno(struct conn *cn_l, __u64 initseqno)
482 cn_l->data_buf.first_offset = initseqno -
483 cn_l->data_buf.datasize +
484 cn_l->data_buf.read_remaining;
487 static void databuf_nextreadchunk(struct conn *cn_lx)
489 BUG_ON(cn_lx->data_buf.nextread == 0);
490 BUG_ON(cn_lx->data_buf.next_read_offset !=
491 cn_lx->data_buf.nextread->datalen);
493 if (&(cn_lx->data_buf.nextread->buf_list) ==
494 cn_lx->data_buf.items.prev) {
495 BUG_ON(cn_lx->data_buf.read_remaining != 0);
496 cn_lx->data_buf.nextread = 0;
498 } else {
499 BUG_ON(cn_lx->data_buf.read_remaining == 0);
500 cn_lx->data_buf.nextread = container_of(
501 cn_lx->data_buf.nextread->buf_list.next,
502 struct data_buf_item, buf_list);
505 cn_lx->data_buf.next_read_offset = 0;
508 void databuf_pull(struct conn *cn_lx, char *dst, __u32 len)
510 BUG_ON(cn_lx->data_buf.read_remaining < len);
512 while (len > 0) {
513 int cpy = len;
515 char *srcbufcpystart = 0;
516 int srcbufcpylen = 0;
518 BUG_ON(cn_lx->data_buf.nextread == 0);
519 BUG_ON(cn_lx->data_buf.next_read_offset >=
520 cn_lx->data_buf.nextread->datalen);
522 srcbufcpystart = cn_lx->data_buf.nextread->buf +
523 cn_lx->data_buf.next_read_offset;
524 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
525 cn_lx->data_buf.next_read_offset;
527 if (cpy > srcbufcpylen)
528 cpy = srcbufcpylen;
530 memcpy(dst, srcbufcpystart, cpy);
532 dst += cpy;
533 len -= cpy;
535 cn_lx->data_buf.read_remaining -= cpy;
536 cn_lx->data_buf.next_read_offset += cpy;
538 if (cpy == srcbufcpylen)
539 databuf_nextreadchunk(cn_lx);
543 void databuf_unpull_dpi(struct conn *trgt_sock, struct cor_sock *cs,
544 struct data_buf_item *item, __u16 next_read_offset)
546 BUG_ON(next_read_offset > item->datalen);
548 if (next_read_offset >= item->datalen)
549 goto free;
551 spin_lock_bh(&(trgt_sock->rcv_lock));
553 if (unlikely(is_trgt_sock(trgt_sock, cs) == 0)) {
554 spin_unlock_bh(&(trgt_sock->rcv_lock));
555 goto free;
558 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
559 &(trgt_sock->data_buf.nextread->buf_list) !=
560 trgt_sock->data_buf.items.next);
561 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
563 trgt_sock->data_buf.first_offset -= item->datalen;
564 trgt_sock->data_buf.datasize += item->datalen;
565 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
567 if (item->type == DATABUF_BUF) {
568 trgt_sock->data_buf.overhead += sizeof(struct data_buf_item) +
569 item->buflen - item->datalen;
570 } else if (item->type == DATABUF_SKB) {
571 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
572 } else {
573 BUG();
576 list_add(&(item->buf_list), &(trgt_sock->data_buf.items));
578 trgt_sock->data_buf.nextread = item;
579 trgt_sock->data_buf.next_read_offset = next_read_offset;
581 account_bufspace(trgt_sock);
583 spin_unlock_bh(&(trgt_sock->rcv_lock));
585 if (0) {
586 free:
587 databuf_item_free(item);
591 void databuf_pull_dbi(struct cor_sock *cs_rl, struct conn *trgt_sock_l)
593 struct data_buf_item *dbi = 0;
594 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
595 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
597 if (trgt_sock_l->data_buf.read_remaining == 0)
598 return;
600 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
601 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
602 trgt_sock_l->data_buf.nextread->datalen);
603 dbi = trgt_sock_l->data_buf.nextread;
605 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
607 cs_rl->data.conn_raw.rcvitem = dbi;
608 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
610 trgt_sock_l->data_buf.first_offset += dbi->datalen;
611 trgt_sock_l->data_buf.datasize -= dbi->datalen;
612 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
614 account_bufspace(trgt_sock_l);
616 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
617 databuf_nextreadchunk(trgt_sock_l);
619 databuf_item_unlink(trgt_sock_l, dbi);
622 void databuf_unpull(struct conn *trgt_out_l, __u32 bytes)
624 trgt_out_l->data_buf.read_remaining += bytes;
626 BUG_ON(list_empty(&(trgt_out_l->data_buf.items)) != 0);
628 if (trgt_out_l->data_buf.nextread == 0) {
629 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
631 trgt_out_l->data_buf.nextread = container_of(
632 trgt_out_l->data_buf.items.prev,
633 struct data_buf_item, buf_list);
636 while (bytes > trgt_out_l->data_buf.next_read_offset) {
637 bytes -= trgt_out_l->data_buf.next_read_offset;
638 trgt_out_l->data_buf.nextread = container_of(
639 trgt_out_l->data_buf.nextread->buf_list.prev,
640 struct data_buf_item, buf_list);
641 BUG_ON(&(trgt_out_l->data_buf.nextread->buf_list) ==
642 &(trgt_out_l->data_buf.items));
643 trgt_out_l->data_buf.next_read_offset =
644 trgt_out_l->data_buf.nextread->datalen;
647 trgt_out_l->data_buf.next_read_offset -= bytes;
650 void databuf_pullold(struct conn *trgt_out_l, __u64 startpos, char *dst,
651 int len)
653 __u64 pos = trgt_out_l->data_buf.first_offset;
654 struct data_buf_item *dbi = container_of(
655 trgt_out_l->data_buf.items.next,
656 struct data_buf_item, buf_list);
658 while (1) {
659 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
661 if (seqno_after(pos + dbi->datalen, startpos))
662 break;
664 pos += dbi->datalen;
665 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
666 buf_list);
669 while (len > 0) {
670 int cpy = len;
672 char *srcbufcpystart = 0;
673 int srcbufcpylen = 0;
675 __u64 offset = seqno_clean(startpos - pos);
677 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
679 BUG_ON(seqno_before(startpos, pos));
680 BUG_ON(offset > dbi->datalen);
682 srcbufcpystart = dbi->buf + offset;
683 srcbufcpylen = dbi->datalen - offset;
685 if (cpy > srcbufcpylen)
686 cpy = srcbufcpylen;
688 memcpy(dst, srcbufcpystart, cpy);
690 dst += cpy;
691 len -= cpy;
692 startpos += cpy;
694 pos += dbi->datalen;
695 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
696 buf_list);
700 /* ack up to *not* including pos */
701 void databuf_ack(struct conn *trgt_out_l, __u64 pos)
703 __u32 acked = 0;
705 while (!list_empty(&(trgt_out_l->data_buf.items))) {
706 struct data_buf_item *firstitem = container_of(
707 trgt_out_l->data_buf.items.next,
708 struct data_buf_item, buf_list);
710 if (firstitem == trgt_out_l->data_buf.nextread)
711 break;
713 if (seqno_after_eq(trgt_out_l->data_buf.first_offset +
714 firstitem->datalen, pos))
715 break;
717 trgt_out_l->data_buf.first_offset += firstitem->datalen;
718 acked += firstitem->datalen;
720 databuf_item_unlink(trgt_out_l, firstitem);
721 databuf_item_free(firstitem);
724 trgt_out_l->data_buf.datasize -= acked;
726 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
727 trgt_out_l->data_buf.overhead != 0);
729 if (acked != 0)
730 account_bufspace(trgt_out_l);
733 void databuf_ackread(struct conn *cn_lx)
735 __u32 acked = 0;
737 while (!list_empty(&(cn_lx->data_buf.items))) {
738 struct data_buf_item *firstitem = container_of(
739 cn_lx->data_buf.items.next,
740 struct data_buf_item, buf_list);
742 if (firstitem == cn_lx->data_buf.nextread)
743 break;
745 acked += firstitem->datalen;
747 databuf_item_unlink(cn_lx, firstitem);
748 databuf_item_free(firstitem);
751 cn_lx->data_buf.datasize -= acked;
752 cn_lx->data_buf.first_offset += acked;
754 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
756 if (acked != 0)
757 account_bufspace(cn_lx);
760 __u32 receive_buf(struct conn *cn_lx, char *buf, __u32 datalen,
761 int rcv_delayed_lowbuf, __u8 flush)
763 struct data_buf_item *item = 0;
765 __u32 totalcpy = 0;
767 if (list_empty(&(cn_lx->data_buf.items)) == 0) {
768 struct list_head *last = cn_lx->data_buf.items.prev;
769 item = container_of(last, struct data_buf_item, buf_list);
772 while (datalen > 0) {
773 __u32 cpy = datalen;
775 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
776 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
778 if (item == 0 || item->type != DATABUF_BUF ||
779 item->buflen <= item->datalen) {
780 item = kmem_cache_alloc(data_buf_item_slab, GFP_ATOMIC);
781 if (unlikely(item == 0))
782 break;
784 memset(item, 0, sizeof(struct data_buf_item));
785 item->type = DATABUF_BUF;
787 item->buflen = buf_optlen(datalen);
788 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
790 if (unlikely(item->buf == 0)) {
791 kmem_cache_free(data_buf_item_slab, item);
792 break;
794 item->datalen = 0;
796 list_add_tail(&(item->buf_list),
797 &(cn_lx->data_buf.items));
799 cn_lx->data_buf.overhead += item->buflen +
800 sizeof(struct data_buf_item);
803 BUG_ON(item->type != DATABUF_BUF);
804 BUG_ON(item->buflen <= item->datalen);
806 if (cn_lx->data_buf.nextread == 0) {
807 cn_lx->data_buf.nextread = item;
808 cn_lx->data_buf.next_read_offset = item->datalen;
811 if (item->buflen - item->datalen < cpy)
812 cpy = (item->buflen - item->datalen);
814 memcpy(item->buf + item->datalen, buf, cpy);
815 item->datalen += cpy;
817 BUG_ON(cpy > datalen);
818 buf += cpy;
819 datalen -= cpy;
820 totalcpy += cpy;
822 cn_lx->data_buf.read_remaining += cpy;
823 cn_lx->data_buf.datasize += cpy;
824 cn_lx->data_buf.overhead -= cpy;
825 BUG_ON(cn_lx->data_buf.datasize != 0 &&
826 cn_lx->data_buf.overhead == 0);
829 if (datalen != 0)
830 flush = 0;
831 cn_lx->flush = flush;
833 account_bufspace(cn_lx);
834 bufsize_update(cn_lx, totalcpy, rcv_delayed_lowbuf, flush);
836 return totalcpy;
839 __u32 receive_skb(struct conn *src_in_l, struct sk_buff *skb,
840 int rcv_delayed_lowbuf, __u8 flush)
842 struct skb_procstate *ps = skb_pstate(skb);
843 struct data_buf_item *item = &(ps->funcstate.rcv.dbi);
845 __u32 bufferleft = 0;
847 BUG_ON(skb->len <= 0);
849 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
850 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
851 (1 << 30))))
852 return 0;
854 if (list_empty(&(src_in_l->data_buf.items)) == 0) {
855 struct list_head *last = src_in_l->data_buf.items.prev;
856 struct data_buf_item *item = container_of(last,
857 struct data_buf_item, buf_list);
858 bufferleft = item->buflen - item->datalen;
861 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
862 __u32 rc = receive_buf(src_in_l, skb->data, skb->len,
863 rcv_delayed_lowbuf, flush);
864 if (likely(rc == skb->len))
865 kfree_skb(skb);
866 return rc;
869 memset(item, 0, sizeof(struct data_buf_item));
871 item->type = DATABUF_SKB;
872 item->buf = skb->data;
873 item->datalen = skb->len;
874 item->buflen = item->datalen;
875 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
876 if (src_in_l->data_buf.nextread == 0)
877 src_in_l->data_buf.nextread = item;
879 src_in_l->data_buf.read_remaining += item->datalen;
880 src_in_l->data_buf.datasize += item->datalen;
881 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
883 account_bufspace(src_in_l);
884 bufsize_update(src_in_l, skb->len, rcv_delayed_lowbuf, flush);
886 src_in_l->flush = flush;
888 return skb->len;
891 void wake_sender(struct conn *cn)
893 spin_lock_bh(&(cn->rcv_lock));
895 if (unlikely(cn->isreset)) {
896 spin_unlock_bh(&(cn->rcv_lock));
897 return;
900 switch (cn->sourcetype) {
901 case SOURCE_UNCONNECTED:
902 spin_unlock_bh(&(cn->rcv_lock));
903 spin_lock_bh(&(cn->reversedir->rcv_lock));
904 if (likely(cn->reversedir->isreset == 0 &&
905 cn->reversedir->targettype ==
906 TARGET_UNCONNECTED))
907 proc_cpacket(cn->reversedir);
908 spin_unlock_bh(&(cn->reversedir->rcv_lock));
909 break;
910 case SOURCE_SOCK:
911 if (cn->source.sock.cs != 0 /*&& cor_sock_sndbufavailable(cn)*/)
912 cor_sk_write_space(cn->source.sock.cs);
913 spin_unlock_bh(&(cn->rcv_lock));
914 break;
915 case SOURCE_IN:
916 drain_ooo_queue(cn);
917 if (likely(cn->source.in.established != 0)) {
918 send_ack_conn_ifneeded(cn, 0, 0);
920 spin_unlock_bh(&(cn->rcv_lock));
921 break;
922 default:
923 BUG();
927 int __init forward_init(void)
929 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
930 sizeof(struct data_buf_item), 8, 0, 0);
931 if (unlikely(data_buf_item_slab == 0))
932 return -ENOMEM;
934 atomic64_set(&bufused_sum, 0);
936 return 0;
939 MODULE_LICENSE("GPL");