32bit seqno, bugfixes
[cor.git] / net / cor / conn_databuf.c
blobe390b49d1a455ab96577cfea531dd9a8f6ec8f62
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/mutex.h>
18 #include "cor.h"
20 struct kmem_cache *cor_data_buf_item_slab;
22 atomic64_t cor_bufused_sum;
24 /* __u64 get_bufspace_used(void)
26 return atomic64_read(&cor_bufused_sum);
27 } */
29 void cor_databuf_init(struct cor_conn *cn_init)
31 memset(&cn_init->data_buf, 0, sizeof(cn_init->data_buf));
32 INIT_LIST_HEAD(&cn_init->data_buf.items);
35 void cor_bufsize_init(struct cor_conn *cn_l, __u32 bufsize)
37 __u32 bufsize_shifted;
39 memset(&cn_l->bufsize, 0, sizeof(cn_l->bufsize));
41 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
42 bufsize_shifted = U32_MAX;
43 else
44 bufsize_shifted = bufsize << 5;
46 cn_l->bufsize.bufsize = bufsize_shifted;
47 cn_l->bufsize.state = BUFSIZE_NOACTION;
48 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
51 int cor_account_bufspace(struct cor_conn *cn_lx)
53 __u64 space_needed = 0;
54 __u32 space_req;
55 __u64 bufused_sum_int;
57 if (likely(cn_lx->isreset == 0)) {
58 space_needed += cn_lx->data_buf.datasize;
59 space_needed += cn_lx->data_buf.overhead;
60 if (cn_lx->sourcetype == SOURCE_IN) {
61 space_needed += cn_lx->src.in.reorder_memused;
65 if (cn_lx->bufspace_accounted == space_needed) {
66 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
67 return 1;
68 } else if (atomic64_read(&cor_bufused_sum) >=
69 BUFUSAGE_GLOBAL_MAX) {
70 return 1;
71 } else {
72 return 0;
76 if (unlikely(space_needed >= U32_MAX))
77 space_req = U32_MAX;
78 else
79 space_req = space_needed;
81 if (cn_lx->bufspace_accounted == space_req)
82 return 1;
84 bufused_sum_int = cor_update_atomic_sum(&cor_bufused_sum,
85 cn_lx->bufspace_accounted, space_req);
87 cn_lx->bufspace_accounted = space_req;
89 if (cn_lx->targettype == TARGET_OUT && unlikely(
90 (cn_lx->trgt.out.in_nb_busy_list == 0) !=
91 (cn_lx->data_buf.datasize == 0)))
92 cor_conn_set_last_act(cn_lx);
94 if (space_needed != space_req)
95 return 1;
96 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
97 return 1;
98 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
99 return 1;
100 else
101 return 0;
104 int cor_conn_src_unconn_write_allowed(struct cor_conn *src_unconn_lx)
106 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
108 if (src_unconn_lx->data_buf.datasize == 0)
109 return 1;
110 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_SRC_UNCONN &&
111 cor_account_bufspace(src_unconn_lx) == 0)
112 return 1;
113 else
114 return 0;
117 void cor_update_windowlimit(struct cor_conn *src_in_lx)
119 __u32 bufsize;
121 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
123 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
125 if (src_in_lx->targettype != TARGET_OUT) {
126 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
127 cor_account_bufspace(src_in_lx)) {
128 bufsize = WINDOW_MAX_PER_CONN_MIN;
130 } else if (cor_seqno_before_eq(src_in_lx->trgt.out.seqno_windowlimit,
131 src_in_lx->trgt.out.seqno_nextsend)) {
132 if (cor_account_bufspace(src_in_lx)) {
133 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
135 } else {
136 __u32 windowleft = src_in_lx->trgt.out.seqno_windowlimit -
137 src_in_lx->trgt.out.seqno_nextsend;
139 bufsize = max(bufsize, min(windowleft,
140 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
142 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
143 cor_account_bufspace(src_in_lx)) {
144 bufsize = WINDOW_MAX_PER_CONN_MIN;
148 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
149 bufsize = WINDOW_MAX_PER_CONN_MAX;
151 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
152 bufsize = 0;
153 else
154 bufsize -= src_in_lx->data_buf.read_remaining;
156 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
157 bufsize = 0;
159 src_in_lx->src.in.window_seqnolimit =
160 src_in_lx->src.in.next_seqno + bufsize;
163 static int cor_bufsize_high_latency_sender(struct cor_conn *cn_lx)
165 struct cor_neighbor *nb;
167 __u64 latency_us;
169 if (cn_lx->sourcetype != SOURCE_IN)
170 return 0;
172 nb = cn_lx->src.in.nb;
173 if (unlikely(nb == 0))
174 return 0;
176 latency_us = atomic_read(&nb->latency_retrans_us);
177 latency_us += atomic_read(&nb->latency_stddev_retrans_us);
178 latency_us += CMSG_MAXDELAY_ACKCONN_MS * 1000;
180 return latency_us > 100000 ? 1 : 0;
183 __u8 _cor_bufsize_update_get_changerate(struct cor_conn *cn_lx)
185 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
186 int high_latency_conn = (cn_lx->is_highlatency != 0);
188 __u32 changerate;
190 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
191 changerate = 128;
192 } else if (cn_lx->bufsize.state == BUFSIZE_DECR ||
193 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
194 __u8 speed = 4;
196 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
197 speed *= 2;
198 if (high_latency_sender)
199 speed *= 2;
200 if (high_latency_conn)
201 speed /= 2;
203 changerate = 128 - speed;
204 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
205 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
206 __u8 speed = 4;
208 if (high_latency_sender)
209 speed *= 2;
211 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
212 speed *= 4;
213 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
214 speed *= 2;
216 changerate = 128 + speed;
217 } else {
218 BUG();
221 /* printk(KERN_ERR "changerate1 %u\n", changerate); */
223 if (cn_lx->targettype == TARGET_OUT) {
224 __u16 remote_changerate = ((__u16)
225 cn_lx->trgt.out.remote_bufsize_changerate) + 64;
226 /* printk(KERN_ERR "changerate2 %u\n", remote_changerate); */
227 changerate = (changerate * remote_changerate) / 128;
228 /* printk(KERN_ERR "changerate3 %u\n", changerate); */
231 if (unlikely(changerate < 64))
232 return 0;
233 else if (unlikely(changerate - 64 >= 256))
234 return 255;
235 else
236 return (__u8) (changerate - 64);
239 static void _cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
240 int high_latency_sender, int high_latency_conn)
242 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
243 BUILD_BUG_ON(BUFSIZE_SHIFT != 5);
246 * If you change the speed here, change it in
247 * _cor_bufsize_update_get_changerate too
250 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
251 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
252 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
253 return;
256 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
257 cn_lx->bufsize.state = BUFSIZE_DECR;
258 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
261 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
262 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
263 __u8 speed = 1;
264 __u32 change;
266 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
267 speed *= 2;
269 if (high_latency_sender)
270 speed *= 2;
272 change = rcvd * speed;
273 if (high_latency_conn)
274 change /= 2;
276 if (cn_lx->bufsize.bufsize < change)
277 cn_lx->bufsize.bufsize = 0;
278 else
279 cn_lx->bufsize.bufsize -= change;
281 if (cn_lx->bufsize.act.decr.size_start / 4 >
282 cn_lx->bufsize.bufsize)
283 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
284 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
285 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
286 __u8 speed = 1;
287 __u32 change;
289 if (high_latency_sender)
290 speed *= 2;
292 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
293 speed *= 4;
294 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
295 speed *= 2;
297 change = rcvd * speed;
298 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
299 cn_lx->bufsize.bufsize = U32_MAX;
300 else
301 cn_lx->bufsize.bufsize += change;
303 if (cn_lx->bufsize.bufsize >=
304 cn_lx->bufsize.act.incr.size_end) {
305 cn_lx->bufsize.bufsize =
306 cn_lx->bufsize.act.incr.size_end;
307 cn_lx->bufsize.state = BUFSIZE_NOACTION;
308 BUILD_BUG_ON(BUFSIZE_SHIFT < 3);
309 if (high_latency_conn) {
310 cn_lx->bufsize.act.noact.bytesleft =
311 (cn_lx->bufsize.bufsize >>
312 (BUFSIZE_SHIFT - 3));
313 } else {
314 cn_lx->bufsize.act.noact.bytesleft =
315 (cn_lx->bufsize.bufsize >>
316 (BUFSIZE_SHIFT - 2));
319 } else {
320 BUG();
323 if (unlikely(rcvd >= (1 << 24)) ||
324 cn_lx->bufsize.bytes_rcvd + rcvd >= (1 << 24))
325 cn_lx->bufsize.bytes_rcvd = (1 << 24) - 1;
326 else
327 cn_lx->bufsize.bytes_rcvd += rcvd;
330 static __u32 cor_get_read_remaining_min(__u32 bufsize_bytes,
331 int high_latency_sender, int high_latency_conn)
333 int bufspace_low = (atomic64_read(&cor_bufused_sum) >=
334 3 * (BUFUSAGE_GLOBAL_MAX / 4));
336 if (high_latency_conn) {
337 if (high_latency_sender) {
338 if (bufspace_low) {
339 return bufsize_bytes / 6 + 1;
340 } else {
341 return bufsize_bytes / 3 + 1;
343 } else {
344 if (bufspace_low) {
345 return bufsize_bytes / 8 + 1;
346 } else {
347 return bufsize_bytes / 4 + 1;
350 } else {
351 if (high_latency_sender) {
352 if (bufspace_low) {
353 return bufsize_bytes / 6 + 1;
354 } else {
355 return bufsize_bytes / 4 + 1;
357 } else {
358 if (bufspace_low) {
359 return bufsize_bytes / 12 + 1;
360 } else {
361 return bufsize_bytes / 8 + 1;
367 static void cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
368 __u8 windowused, __u8 rcv_flushrcvd)
370 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
371 int high_latency_conn = (cn_lx->is_highlatency != 0);
372 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
373 __u32 read_remaining_min = cor_get_read_remaining_min(bufsize_bytes,
374 high_latency_sender, high_latency_conn);
375 __u32 read_remaining_min_nofastdecr = read_remaining_min * 2;
376 __u32 read_remaining_min_nodecr = (read_remaining_min +
377 read_remaining_min_nofastdecr) / 2;
378 __u32 read_remaining;
380 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
381 BUG_ON(windowused > 31);
383 /* if (cn_lx->is_highlatency == 0)
384 printk(KERN_ERR "bufsize %p %u %u %u %u %u %u\n",
385 cn_lx, bufsize_bytes,
386 cn_lx->data_buf.read_remaining, rcvd,
387 windowused, rcv_flushrcvd,
388 cn_lx->bufsize.ignore_rcv_lowbuf); */
390 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
391 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
392 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
393 else
394 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
396 read_remaining = bufsize_bytes;
397 } else {
398 read_remaining = max(cn_lx->data_buf.read_remaining - rcvd,
399 (bufsize_bytes * (31 - windowused)) / 31);
402 if (rcv_flushrcvd != 0) {
403 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
405 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
406 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
407 cn_lx->bufsize.state = BUFSIZE_NOACTION;
408 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
409 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
410 cn_lx->bufsize.act.noact.bytesleft <
411 bytesleft) {
412 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
416 if (read_remaining < read_remaining_min) {
417 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
418 __u32 buf_increase;
419 __u32 bufsize_end;
421 if (high_latency_sender) {
422 if (buf_increase_bytes < rcvd / 16)
423 buf_increase_bytes = rcvd / 16;
424 } else {
425 if (buf_increase_bytes < rcvd / 32)
426 buf_increase_bytes = rcvd / 32;
429 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
430 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
431 buf_increase_bytes))
432 buf_increase = U32_MAX;
434 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
435 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
436 bufsize_end = U32_MAX;
438 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
439 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
440 cn_lx->bufsize.state = BUFSIZE_INCR;
441 cn_lx->bufsize.act.incr.size_start =
442 cn_lx->bufsize.bufsize;
443 cn_lx->bufsize.act.incr.size_end = 0;
446 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
447 cn_lx->bufsize.act.incr.size_end = bufsize_end;
448 if (bufsize_end / 4 > cn_lx->bufsize.act.incr.size_start)
449 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
450 } else if (read_remaining < read_remaining_min_nodecr) {
451 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
452 cn_lx->bufsize.state == BUFSIZE_DECR ||
453 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
454 __u32 bytesleft = 0;
455 __u8 rtt_mul = 2;
457 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
458 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
459 if (high_latency_conn)
460 rtt_mul *= 2;
461 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
462 rtt_mul > bytesleft))
463 bytesleft = (cn_lx->bufsize.bufsize >>
464 BUFSIZE_SHIFT) * rtt_mul;
466 cn_lx->bufsize.state = BUFSIZE_NOACTION;
467 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
469 } else if (read_remaining < read_remaining_min_nofastdecr) {
470 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
471 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
472 cn_lx->bufsize.state = BUFSIZE_DECR;
473 cn_lx->bufsize.act.decr.size_start =
474 cn_lx->bufsize.bufsize;
478 if (cn_lx->targettype == TARGET_OUT) {
479 __u16 rem_changerate = ((__u16)
480 cn_lx->trgt.out.remote_bufsize_changerate) + 64;
481 __u16 crate_nofastdecr;
482 __u16 crate_nodecr;
483 __u16 crate_nofastincr;
484 __u16 crate_noincr;
486 if (high_latency_conn) {
487 crate_nofastdecr = 128 - 128 / 8;
488 crate_nodecr = 128 - 128 / 6;
489 crate_nofastincr = 128 + 128 / 4;
490 crate_noincr = 128 + 128 / 3;
491 } else {
492 crate_nofastdecr = 128 - 128 / 16;
493 crate_nodecr = 128 - 128 / 12;
494 crate_nofastincr = 128 + 128 / 8;
495 crate_noincr = 128 + 128 / 6;
498 if ((rem_changerate < crate_nodecr ||
499 rem_changerate > crate_noincr) &&
500 cn_lx->bufsize.state == BUFSIZE_NOACTION) {
501 cn_lx->bufsize.act.noact.bytesleft = max(
502 cn_lx->bufsize.act.noact.bytesleft,
503 cn_lx->bufsize.bufsize >>
504 BUFSIZE_SHIFT);
507 if (rem_changerate < crate_nodecr && (
508 cn_lx->bufsize.state == BUFSIZE_DECR ||
509 cn_lx->bufsize.state == BUFSIZE_DECR_FAST)) {
510 cn_lx->bufsize.state = BUFSIZE_NOACTION;
511 cn_lx->bufsize.act.noact.bytesleft =
512 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
514 if (rem_changerate < crate_nofastdecr &&
515 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
516 cn_lx->bufsize.state = BUFSIZE_DECR;
517 cn_lx->bufsize.act.decr.size_start =
518 cn_lx->bufsize.bufsize;
521 if (rem_changerate > crate_noincr && (
522 cn_lx->bufsize.state == BUFSIZE_INCR ||
523 cn_lx->bufsize.state == BUFSIZE_INCR_FAST)) {
524 cn_lx->bufsize.state = BUFSIZE_NOACTION;
525 cn_lx->bufsize.act.noact.bytesleft =
526 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
528 if (rem_changerate > crate_nofastincr &&
529 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
530 cn_lx->bufsize.state = BUFSIZE_INCR;
531 cn_lx->bufsize.act.incr.size_start =
532 cn_lx->bufsize.bufsize;
536 _cor_bufsize_update(cn_lx, rcvd, high_latency_sender,
537 high_latency_conn);
540 void cor_bufsize_read_to_sock(struct cor_conn *trgt_sock_lx)
542 unsigned long jiffies_tmp = jiffies;
543 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
544 HZ / 10 : HZ / 40);
547 * High cpu usage may cause high latency of the userspace receiver.
548 * Increasing bufferspace to compensate may increase latency further.
550 if (trgt_sock_lx->trgt.sock.waiting_for_userspace != 0 && time_before(
551 trgt_sock_lx->trgt.sock.waiting_for_userspace_since,
552 jiffies - latency_limit)) {
553 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
554 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
557 if (trgt_sock_lx->data_buf.read_remaining == 0) {
558 trgt_sock_lx->trgt.sock.waiting_for_userspace = 0;
559 } else {
560 trgt_sock_lx->trgt.sock.waiting_for_userspace = 1;
561 trgt_sock_lx->trgt.sock.waiting_for_userspace_since =
562 jiffies_tmp;
566 static inline void cor_databuf_item_unlink(struct cor_conn *cn_lx,
567 struct cor_data_buf_item *item)
569 BUG_ON(item == cn_lx->data_buf.nextread);
570 list_del(&item->buf_list);
571 if (item->type == DATABUF_BUF) {
572 cn_lx->data_buf.overhead -= sizeof(struct cor_data_buf_item) +
573 item->buflen - item->datalen;
574 } else if (item->type == DATABUF_SKB) {
575 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
576 } else {
577 BUG();
581 void cor_databuf_ackdiscard(struct cor_conn *cn_lx)
583 __u32 freed = 0;
585 cn_lx->data_buf.next_read_offset = 0;
586 cn_lx->data_buf.nextread = 0;
588 while (!list_empty(&cn_lx->data_buf.items)) {
589 struct cor_data_buf_item *item = container_of(
590 cn_lx->data_buf.items.next,
591 struct cor_data_buf_item, buf_list);
592 freed += item->datalen;
594 cor_databuf_item_unlink(cn_lx, item);
595 cor_databuf_item_free(item);
598 cn_lx->data_buf.datasize -= freed;
599 cn_lx->data_buf.first_offset += freed;
601 BUG_ON(cn_lx->data_buf.datasize != 0);
602 BUG_ON(cn_lx->data_buf.overhead != 0);
604 cn_lx->data_buf.read_remaining = 0;
607 void cor_reset_seqno(struct cor_conn *cn_l, __u32 initseqno)
609 cn_l->data_buf.first_offset = initseqno -
610 cn_l->data_buf.datasize +
611 cn_l->data_buf.read_remaining;
614 static void cor_databuf_nextreadchunk(struct cor_conn *cn_lx)
616 BUG_ON(cn_lx->data_buf.nextread == 0);
617 BUG_ON(cn_lx->data_buf.next_read_offset !=
618 cn_lx->data_buf.nextread->datalen);
620 if (&cn_lx->data_buf.nextread->buf_list == cn_lx->data_buf.items.prev) {
621 BUG_ON(cn_lx->data_buf.read_remaining != 0);
622 cn_lx->data_buf.nextread = 0;
624 } else {
625 BUG_ON(cn_lx->data_buf.read_remaining == 0);
626 cn_lx->data_buf.nextread = container_of(
627 cn_lx->data_buf.nextread->buf_list.next,
628 struct cor_data_buf_item, buf_list);
631 cn_lx->data_buf.next_read_offset = 0;
634 void cor_databuf_pull(struct cor_conn *cn_lx, char *dst, __u32 len)
636 BUG_ON(cn_lx->data_buf.read_remaining < len);
638 while (len > 0) {
639 int cpy = len;
641 char *srcbufcpystart = 0;
642 int srcbufcpylen = 0;
644 BUG_ON(cn_lx->data_buf.nextread == 0);
645 BUG_ON(cn_lx->data_buf.next_read_offset >=
646 cn_lx->data_buf.nextread->datalen);
648 srcbufcpystart = cn_lx->data_buf.nextread->buf +
649 cn_lx->data_buf.next_read_offset;
650 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
651 cn_lx->data_buf.next_read_offset;
653 if (cpy > srcbufcpylen)
654 cpy = srcbufcpylen;
656 memcpy(dst, srcbufcpystart, cpy);
658 dst += cpy;
659 len -= cpy;
661 cn_lx->data_buf.read_remaining -= cpy;
662 cn_lx->data_buf.next_read_offset += cpy;
664 if (cpy == srcbufcpylen)
665 cor_databuf_nextreadchunk(cn_lx);
669 void cor_databuf_unpull_dpi(struct cor_conn *trgt_sock, struct cor_sock *cs,
670 struct cor_data_buf_item *item, __u16 next_read_offset)
672 BUG_ON(next_read_offset > item->datalen);
674 if (next_read_offset >= item->datalen)
675 goto free;
677 spin_lock_bh(&trgt_sock->rcv_lock);
679 if (unlikely(cor_is_trgt_sock(trgt_sock, cs) == 0)) {
680 spin_unlock_bh(&trgt_sock->rcv_lock);
681 goto free;
684 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
685 &trgt_sock->data_buf.nextread->buf_list !=
686 trgt_sock->data_buf.items.next);
687 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
689 trgt_sock->data_buf.first_offset -= item->datalen;
690 trgt_sock->data_buf.datasize += item->datalen;
691 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
693 if (item->type == DATABUF_BUF) {
694 trgt_sock->data_buf.overhead +=
695 sizeof(struct cor_data_buf_item) +
696 item->buflen - item->datalen;
697 } else if (item->type == DATABUF_SKB) {
698 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
699 } else {
700 BUG();
703 list_add(&item->buf_list, &trgt_sock->data_buf.items);
705 trgt_sock->data_buf.nextread = item;
706 trgt_sock->data_buf.next_read_offset = next_read_offset;
708 cor_account_bufspace(trgt_sock);
710 spin_unlock_bh(&trgt_sock->rcv_lock);
712 if (0) {
713 free:
714 cor_databuf_item_free(item);
718 void cor_databuf_pull_dbi(struct cor_sock *cs_rl, struct cor_conn *trgt_sock_l)
720 struct cor_data_buf_item *dbi = 0;
721 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
722 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
724 if (trgt_sock_l->data_buf.read_remaining == 0)
725 return;
727 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
728 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
729 trgt_sock_l->data_buf.nextread->datalen);
730 dbi = trgt_sock_l->data_buf.nextread;
732 BUG_ON(&dbi->buf_list != trgt_sock_l->data_buf.items.next);
734 cs_rl->data.conn_raw.rcvitem = dbi;
735 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
737 trgt_sock_l->data_buf.first_offset += dbi->datalen;
738 trgt_sock_l->data_buf.datasize -= dbi->datalen;
739 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
741 cor_account_bufspace(trgt_sock_l);
743 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
744 cor_databuf_nextreadchunk(trgt_sock_l);
746 cor_databuf_item_unlink(trgt_sock_l, dbi);
749 void cor_databuf_unpull(struct cor_conn *trgt_out_l, __u32 bytes)
751 trgt_out_l->data_buf.read_remaining += bytes;
753 BUG_ON(list_empty(&trgt_out_l->data_buf.items) != 0);
755 if (trgt_out_l->data_buf.nextread == 0) {
756 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
758 trgt_out_l->data_buf.nextread = container_of(
759 trgt_out_l->data_buf.items.prev,
760 struct cor_data_buf_item, buf_list);
763 while (bytes > trgt_out_l->data_buf.next_read_offset) {
764 bytes -= trgt_out_l->data_buf.next_read_offset;
765 trgt_out_l->data_buf.nextread = container_of(
766 trgt_out_l->data_buf.nextread->buf_list.prev,
767 struct cor_data_buf_item, buf_list);
768 BUG_ON(&trgt_out_l->data_buf.nextread->buf_list ==
769 &trgt_out_l->data_buf.items);
770 trgt_out_l->data_buf.next_read_offset =
771 trgt_out_l->data_buf.nextread->datalen;
774 trgt_out_l->data_buf.next_read_offset -= bytes;
777 void cor_databuf_pullold(struct cor_conn *trgt_out_l, __u32 startpos, char *dst,
778 int len)
780 __u32 pos = trgt_out_l->data_buf.first_offset;
781 struct cor_data_buf_item *dbi = container_of(
782 trgt_out_l->data_buf.items.next,
783 struct cor_data_buf_item, buf_list);
785 while (1) {
786 BUG_ON(&dbi->buf_list == &trgt_out_l->data_buf.items);
788 if (cor_seqno_after(pos + dbi->datalen, startpos))
789 break;
791 pos += dbi->datalen;
792 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
793 buf_list);
796 while (len > 0) {
797 int cpy = len;
799 char *srcbufcpystart = 0;
800 int srcbufcpylen = 0;
802 __u32 offset = startpos - pos;
804 BUG_ON(&dbi->buf_list == &trgt_out_l->data_buf.items);
806 BUG_ON(cor_seqno_before(startpos, pos));
807 BUG_ON(offset > dbi->datalen);
809 srcbufcpystart = dbi->buf + offset;
810 srcbufcpylen = dbi->datalen - offset;
812 if (cpy > srcbufcpylen)
813 cpy = srcbufcpylen;
815 memcpy(dst, srcbufcpystart, cpy);
817 dst += cpy;
818 len -= cpy;
819 startpos += cpy;
821 pos += dbi->datalen;
822 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
823 buf_list);
827 /* ack up to *not* including pos */
828 void cor_databuf_ack(struct cor_conn *trgt_out_l, __u32 pos)
830 __u32 acked = 0;
832 while (!list_empty(&trgt_out_l->data_buf.items)) {
833 struct cor_data_buf_item *firstitem = container_of(
834 trgt_out_l->data_buf.items.next,
835 struct cor_data_buf_item, buf_list);
837 if (firstitem == trgt_out_l->data_buf.nextread)
838 break;
840 if (cor_seqno_after_eq(trgt_out_l->data_buf.first_offset +
841 firstitem->datalen, pos))
842 break;
844 trgt_out_l->data_buf.first_offset += firstitem->datalen;
845 acked += firstitem->datalen;
847 cor_databuf_item_unlink(trgt_out_l, firstitem);
848 cor_databuf_item_free(firstitem);
851 trgt_out_l->data_buf.datasize -= acked;
853 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
854 trgt_out_l->data_buf.overhead != 0);
856 if (unlikely(trgt_out_l->trgt.out.nblist_busy_remaining <= acked)) {
857 trgt_out_l->trgt.out.nblist_busy_remaining = 0;
858 cor_conn_set_last_act(trgt_out_l);
859 } else {
860 trgt_out_l->trgt.out.nblist_busy_remaining -= acked;
863 if (acked != 0)
864 cor_account_bufspace(trgt_out_l);
867 void cor_databuf_ackread(struct cor_conn *cn_lx)
869 __u32 acked = 0;
871 while (!list_empty(&cn_lx->data_buf.items)) {
872 struct cor_data_buf_item *firstitem = container_of(
873 cn_lx->data_buf.items.next,
874 struct cor_data_buf_item, buf_list);
876 if (firstitem == cn_lx->data_buf.nextread)
877 break;
879 acked += firstitem->datalen;
881 cor_databuf_item_unlink(cn_lx, firstitem);
882 cor_databuf_item_free(firstitem);
885 cn_lx->data_buf.datasize -= acked;
886 cn_lx->data_buf.first_offset += acked;
888 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
890 if (cn_lx->targettype == TARGET_OUT) {
891 if (unlikely(cn_lx->trgt.out.nblist_busy_remaining <=
892 acked)) {
893 cn_lx->trgt.out.nblist_busy_remaining = 0;
894 cor_conn_set_last_act(cn_lx);
895 } else {
896 cn_lx->trgt.out.nblist_busy_remaining -= acked;
900 if (acked != 0)
901 cor_account_bufspace(cn_lx);
904 __u32 _cor_receive_buf(struct cor_conn *cn_lx, char *buf, __u32 datalen,
905 int from_sock, __u8 windowused, __u8 flush)
907 struct cor_data_buf_item *item = 0;
909 __u32 totalcpy = 0;
911 if (list_empty(&cn_lx->data_buf.items) == 0) {
912 struct list_head *last = cn_lx->data_buf.items.prev;
914 item = container_of(last, struct cor_data_buf_item, buf_list);
917 while (datalen > 0) {
918 __u32 cpy = datalen;
920 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
921 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
923 if (item == 0 || item->type != DATABUF_BUF ||
924 item->buflen <= item->datalen) {
925 item = kmem_cache_alloc(cor_data_buf_item_slab,
926 GFP_ATOMIC);
927 if (unlikely(item == 0))
928 break;
930 memset(item, 0, sizeof(struct cor_data_buf_item));
931 item->type = DATABUF_BUF;
933 item->buflen = cor_buf_optlen(datalen, from_sock);
934 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
936 if (unlikely(item->buf == 0)) {
937 kmem_cache_free(cor_data_buf_item_slab, item);
938 break;
940 item->datalen = 0;
942 list_add_tail(&item->buf_list, &cn_lx->data_buf.items);
944 cn_lx->data_buf.overhead += item->buflen +
945 sizeof(struct cor_data_buf_item);
948 BUG_ON(item->type != DATABUF_BUF);
949 BUG_ON(item->buflen <= item->datalen);
951 if (cn_lx->data_buf.nextread == 0) {
952 cn_lx->data_buf.nextread = item;
953 cn_lx->data_buf.next_read_offset = item->datalen;
956 if (item->buflen - item->datalen < cpy)
957 cpy = (item->buflen - item->datalen);
959 memcpy(item->buf + item->datalen, buf, cpy);
960 item->datalen += cpy;
962 BUG_ON(cpy > datalen);
963 buf += cpy;
964 datalen -= cpy;
965 totalcpy += cpy;
967 cn_lx->data_buf.read_remaining += cpy;
968 cn_lx->data_buf.datasize += cpy;
969 cn_lx->data_buf.overhead -= cpy;
970 BUG_ON(cn_lx->data_buf.datasize != 0 &&
971 cn_lx->data_buf.overhead == 0);
974 if (datalen != 0)
975 flush = 0;
976 cn_lx->flush = flush;
978 cor_account_bufspace(cn_lx);
979 cor_bufsize_update(cn_lx, totalcpy, windowused, flush);
981 return totalcpy;
984 __u32 cor_receive_skb(struct cor_conn *src_in_l, struct sk_buff *skb,
985 __u8 windowused, __u8 flush)
987 struct cor_skb_procstate *ps = cor_skb_pstate(skb);
988 struct cor_data_buf_item *item = &ps->funcstate.rcv.dbi;
990 __u32 bufferleft = 0;
992 BUG_ON(skb->len <= 0);
994 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
995 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
996 (1 << 30))))
997 return 0;
999 if (list_empty(&src_in_l->data_buf.items) == 0) {
1000 struct list_head *last = src_in_l->data_buf.items.prev;
1001 struct cor_data_buf_item *item = container_of(last,
1002 struct cor_data_buf_item, buf_list);
1003 bufferleft = item->buflen - item->datalen;
1006 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
1007 __u32 rc = cor_receive_buf(src_in_l, skb->data, skb->len,
1008 windowused, flush);
1009 if (likely(rc == skb->len))
1010 kfree_skb(skb);
1011 return rc;
1014 memset(item, 0, sizeof(struct cor_data_buf_item));
1016 item->type = DATABUF_SKB;
1017 item->buf = skb->data;
1018 item->datalen = skb->len;
1019 item->buflen = item->datalen;
1020 list_add_tail(&item->buf_list, &src_in_l->data_buf.items);
1021 if (src_in_l->data_buf.nextread == 0)
1022 src_in_l->data_buf.nextread = item;
1024 src_in_l->data_buf.read_remaining += item->datalen;
1025 src_in_l->data_buf.datasize += item->datalen;
1026 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
1028 cor_account_bufspace(src_in_l);
1029 cor_bufsize_update(src_in_l, skb->len, windowused, flush);
1031 src_in_l->flush = flush;
1033 return skb->len;
1036 void cor_wake_sender(struct cor_conn *cn)
1038 spin_lock_bh(&cn->rcv_lock);
1040 if (unlikely(cn->isreset)) {
1041 spin_unlock_bh(&cn->rcv_lock);
1042 return;
1045 switch (cn->sourcetype) {
1046 case SOURCE_UNCONNECTED:
1047 spin_unlock_bh(&cn->rcv_lock);
1048 spin_lock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
1049 if (likely(cor_get_conn_reversedir(cn)->isreset == 0 &&
1050 cor_get_conn_reversedir(cn)->targettype ==
1051 TARGET_UNCONNECTED))
1052 cor_proc_cpacket(cor_get_conn_reversedir(cn));
1053 spin_unlock_bh(&cor_get_conn_reversedir(cn)->rcv_lock);
1054 break;
1055 case SOURCE_SOCK:
1056 if (_cor_mngdsocket_flushtoconn(cn) == RC_FTC_OK &&
1057 cn->src.sock.ed->cs != 0 &&
1058 cor_sock_sndbufavailable(cn, 1))
1059 cor_sk_write_space(cn->src.sock.ed->cs);
1060 spin_unlock_bh(&cn->rcv_lock);
1061 break;
1062 case SOURCE_IN:
1063 cor_drain_ooo_queue(cn);
1064 if (likely(cn->src.in.established != 0))
1065 cor_send_ack_conn_ifneeded(cn, 0, 0);
1066 spin_unlock_bh(&cn->rcv_lock);
1067 break;
1068 default:
1069 BUG();
1073 int __init cor_forward_init(void)
1075 cor_data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
1076 sizeof(struct cor_data_buf_item), 8, 0, 0);
1077 if (unlikely(cor_data_buf_item_slab == 0))
1078 return -ENOMEM;
1080 atomic64_set(&cor_bufused_sum, 0);
1082 return 0;
1085 void __exit cor_forward_exit2(void)
1087 BUG_ON(atomic64_read(&cor_bufused_sum) != 0);
1089 kmem_cache_destroy(cor_data_buf_item_slab);
1090 cor_data_buf_item_slab = 0;
1093 MODULE_LICENSE("GPL");