checkpatch fixes
[cor.git] / net / cor / conn_databuf.c
blob4358e100dee9011276c41c2d1df52efed1e9a26e
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2021 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/mutex.h>
18 #include "cor.h"
20 struct kmem_cache *cor_data_buf_item_slab;
22 atomic64_t cor_bufused_sum;
24 /* __u64 get_bufspace_used(void)
26 return atomic64_read(&cor_bufused_sum);
27 } */
29 void cor_databuf_init(struct cor_conn *cn_init)
31 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
32 INIT_LIST_HEAD(&(cn_init->data_buf.items));
35 void cor_bufsize_init(struct cor_conn *cn_l, __u32 bufsize)
37 __u32 bufsize_shifted;
39 memset(&(cn_l->bufsize), 0, sizeof(cn_l->bufsize));
41 if (unlikely((bufsize >> (32 - BUFSIZE_SHIFT)) != 0))
42 bufsize_shifted = U32_MAX;
43 else
44 bufsize_shifted = bufsize << 5;
46 cn_l->bufsize.bufsize = bufsize_shifted;
47 cn_l->bufsize.state = BUFSIZE_NOACTION;
48 cn_l->bufsize.act.noact.bytesleft = bufsize * 4;
51 int cor_account_bufspace(struct cor_conn *cn_lx)
53 __u64 space_needed = 0;
54 __u32 space_req;
55 __u64 bufused_sum_int;
57 if (likely(cn_lx->isreset == 0)) {
58 space_needed += cn_lx->data_buf.datasize;
59 space_needed += cn_lx->data_buf.overhead;
60 if (cn_lx->sourcetype == SOURCE_IN) {
61 space_needed += cn_lx->src.in.reorder_memused;
65 if (cn_lx->bufspace_accounted == space_needed) {
66 if (space_needed >= BUFUSAGE_PER_CONN_MAX) {
67 return 1;
68 } else if (atomic64_read(&cor_bufused_sum) >=
69 BUFUSAGE_GLOBAL_MAX) {
70 return 1;
71 } else {
72 return 0;
76 if (unlikely(space_needed >= U32_MAX))
77 space_req = U32_MAX;
78 else
79 space_req = space_needed;
81 if (cn_lx->bufspace_accounted == space_req)
82 return 1;
84 bufused_sum_int = cor_update_atomic_sum(&cor_bufused_sum,
85 cn_lx->bufspace_accounted, space_req);
87 cn_lx->bufspace_accounted = space_req;
89 if (cn_lx->targettype == TARGET_OUT && unlikely(
90 unlikely(cn_lx->trgt.out.in_nb_busy_list == 0) !=
91 unlikely(cn_lx->data_buf.datasize == 0)))
92 cor_conn_set_last_act(cn_lx);
94 if (space_needed != space_req)
95 return 1;
96 else if (bufused_sum_int >= BUFUSAGE_GLOBAL_MAX)
97 return 1;
98 else if (space_needed >= BUFUSAGE_PER_CONN_MAX)
99 return 1;
100 else
101 return 0;
104 int cor_conn_src_unconn_write_allowed(struct cor_conn *src_unconn_lx)
106 BUG_ON(src_unconn_lx->sourcetype != SOURCE_UNCONNECTED);
108 if (src_unconn_lx->data_buf.datasize == 0)
109 return 1;
110 else if (src_unconn_lx->data_buf.datasize < BUFFERLIMIT_SRC_UNCONN &&
111 cor_account_bufspace(src_unconn_lx) == 0)
112 return 1;
113 else
114 return 0;
117 void cor_update_windowlimit(struct cor_conn *src_in_lx)
119 __u32 bufsize;
121 BUG_ON(src_in_lx->sourcetype != SOURCE_IN);
123 bufsize = src_in_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
125 if (src_in_lx->targettype != TARGET_OUT) {
126 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
127 cor_account_bufspace(src_in_lx)) {
128 bufsize = WINDOW_MAX_PER_CONN_MIN;
130 } else if (cor_seqno_before_eq(src_in_lx->trgt.out.seqno_windowlimit,
131 src_in_lx->trgt.out.seqno_nextsend)) {
132 if (cor_account_bufspace(src_in_lx)) {
133 bufsize = min(bufsize, (__u32) WINDOW_MAX_PER_CONN_MIN);
135 } else {
136 __u32 windowleft = (__u32) min((__u64) U32_MAX,
137 cor_seqno_clean(
138 src_in_lx->trgt.out.seqno_windowlimit -
139 src_in_lx->trgt.out.seqno_nextsend));
141 bufsize = max(bufsize, min(windowleft,
142 (__u32) WINDOW_MAX_PER_CONN_MIN_OUT_WINOK));
144 if (bufsize < WINDOW_MAX_PER_CONN_MIN ||
145 cor_account_bufspace(src_in_lx)) {
146 bufsize = WINDOW_MAX_PER_CONN_MIN;
150 if (bufsize > WINDOW_MAX_PER_CONN_MAX)
151 bufsize = WINDOW_MAX_PER_CONN_MAX;
153 /* printk(KERN_ERR "window %p %u %u\n", src_in_lx, bufsize,
154 src_in_lx->data_buf.read_remaining); */
156 if (unlikely(src_in_lx->data_buf.read_remaining > bufsize))
157 bufsize = 0;
158 else
159 bufsize -= src_in_lx->data_buf.read_remaining;
161 if (unlikely(src_in_lx->targettype == TARGET_DISCARD))
162 bufsize = 0;
164 src_in_lx->src.in.window_seqnolimit =
165 src_in_lx->src.in.next_seqno + bufsize;
168 static int cor_bufsize_high_latency_sender(struct cor_conn *cn_lx)
170 struct cor_neighbor *nb;
172 __u64 latency_us;
174 if (cn_lx->sourcetype != SOURCE_IN)
175 return 0;
177 nb = cn_lx->src.in.nb;
178 if (unlikely(nb == 0))
179 return 0;
181 latency_us = atomic_read(&(nb->latency_retrans_us));
182 latency_us += atomic_read(&(nb->latency_stddev_retrans_us));
183 latency_us += CMSG_MAXDELAY_ACKCONN_LOWLATENCY_MS * 1000;
185 return latency_us > 100000 ? 1 : 0;
188 __u8 _cor_bufsize_update_get_changerate(struct cor_conn *cn_lx)
190 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
191 int high_latency_conn = (cn_lx->is_highlatency != 0);
193 __u32 changerate;
195 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
196 changerate = 128;
197 } else if (cn_lx->bufsize.state == BUFSIZE_DECR ||
198 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
199 __u8 speed = 4;
201 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
202 speed *= 2;
203 if (high_latency_sender)
204 speed *= 2;
205 if (high_latency_conn)
206 speed /= 2;
208 changerate = 128 - speed;
209 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
210 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
211 __u8 speed = 4;
213 if (high_latency_sender)
214 speed *= 2;
216 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
217 speed *= 4;
218 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
219 speed *= 2;
221 changerate = 128 + speed;
222 } else {
223 BUG();
226 /* printk(KERN_ERR "changerate1 %u\n", changerate); */
228 if (cn_lx->targettype == TARGET_OUT) {
229 __u16 remote_changerate = ((__u16)
230 cn_lx->trgt.out.remote_bufsize_changerate) + 64;
231 /* printk(KERN_ERR "changerate2 %u\n", remote_changerate); */
232 changerate = (changerate * remote_changerate) / 128;
233 /* printk(KERN_ERR "changerate3 %u\n", changerate); */
236 if (unlikely(changerate < 64))
237 return 0;
238 else if (unlikely(changerate - 64 >= 256))
239 return 255;
240 else
241 return (__u8) (changerate - 64);
244 static void _cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
245 int high_latency_sender, int high_latency_conn)
247 /* speed of increase/decrease changes with BUFSIZE_SHIFT */
248 BUG_ON(BUFSIZE_SHIFT != 5);
251 * If you change the speed here, change it in
252 * _cor_bufsize_update_get_changerate too
255 if (cn_lx->bufsize.state == BUFSIZE_NOACTION) {
256 if (likely(cn_lx->bufsize.act.noact.bytesleft >= rcvd)) {
257 cn_lx->bufsize.act.noact.bytesleft -= rcvd;
258 return;
261 rcvd -= cn_lx->bufsize.act.noact.bytesleft;
262 cn_lx->bufsize.state = BUFSIZE_DECR;
263 cn_lx->bufsize.act.decr.size_start = cn_lx->bufsize.bufsize;
266 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
267 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
268 __u8 speed = 1;
269 __u32 change;
271 if (cn_lx->bufsize.state == BUFSIZE_DECR_FAST)
272 speed *= 2;
274 if (high_latency_sender)
275 speed *= 2;
277 change = rcvd * speed;
278 if (high_latency_conn)
279 change /= 2;
281 if (cn_lx->bufsize.bufsize < change)
282 cn_lx->bufsize.bufsize = 0;
283 else
284 cn_lx->bufsize.bufsize -= change;
286 if (cn_lx->bufsize.act.decr.size_start/4 >
287 cn_lx->bufsize.bufsize)
288 cn_lx->bufsize.state = BUFSIZE_DECR_FAST;
289 } else if (cn_lx->bufsize.state == BUFSIZE_INCR ||
290 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
291 __u8 speed = 1;
292 __u32 change;
294 if (high_latency_sender)
295 speed *= 2;
297 if (unlikely(cor_bufsize_initial_phase(cn_lx)))
298 speed *= 4;
299 else if (cn_lx->bufsize.state == BUFSIZE_INCR_FAST)
300 speed *= 2;
302 change = rcvd * speed;
303 if (cn_lx->bufsize.bufsize + change < cn_lx->bufsize.bufsize)
304 cn_lx->bufsize.bufsize = U32_MAX;
305 else
306 cn_lx->bufsize.bufsize += change;
308 if (cn_lx->bufsize.bufsize >=
309 cn_lx->bufsize.act.incr.size_end) {
310 cn_lx->bufsize.bufsize =
311 cn_lx->bufsize.act.incr.size_end;
312 cn_lx->bufsize.state = BUFSIZE_NOACTION;
313 if (high_latency_conn) {
314 cn_lx->bufsize.act.noact.bytesleft =
315 (cn_lx->bufsize.bufsize >>
316 (BUFSIZE_SHIFT-3));
317 } else {
318 cn_lx->bufsize.act.noact.bytesleft =
319 (cn_lx->bufsize.bufsize >>
320 (BUFSIZE_SHIFT-2));
323 } else {
324 BUG();
327 if (unlikely(rcvd >= (1 << 24)) ||
328 cn_lx->bufsize.bytes_rcvd + rcvd >= (1 << 24))
329 cn_lx->bufsize.bytes_rcvd = (1 << 24) - 1;
330 else
331 cn_lx->bufsize.bytes_rcvd += rcvd;
334 static __u32 cor_get_read_remaining_min(__u32 bufsize_bytes,
335 int high_latency_sender, int high_latency_conn)
337 int bufspace_low = (atomic64_read(&cor_bufused_sum) >=
338 3*(BUFUSAGE_GLOBAL_MAX/4));
340 if (high_latency_conn) {
341 if (high_latency_sender) {
342 if (bufspace_low) {
343 return bufsize_bytes/6 + 1;
344 } else {
345 return bufsize_bytes/3 + 1;
347 } else {
348 if (bufspace_low) {
349 return bufsize_bytes/8 + 1;
350 } else {
351 return bufsize_bytes/4 + 1;
354 } else {
355 if (high_latency_sender) {
356 if (bufspace_low) {
357 return bufsize_bytes/6 + 1;
358 } else {
359 return bufsize_bytes/4 + 1;
361 } else {
362 if (bufspace_low) {
363 return bufsize_bytes/12 + 1;
364 } else {
365 return bufsize_bytes/8 + 1;
371 static void cor_bufsize_update(struct cor_conn *cn_lx, __u32 rcvd,
372 __u8 windowused, __u8 rcv_flushrcvd)
374 int high_latency_sender = cor_bufsize_high_latency_sender(cn_lx);
375 int high_latency_conn = (cn_lx->is_highlatency != 0);
376 __u32 bufsize_bytes = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
377 __u32 read_remaining_min = cor_get_read_remaining_min(bufsize_bytes,
378 high_latency_sender, high_latency_conn);
379 __u32 read_remaining_min_nofastdecr = read_remaining_min*2;
380 __u32 read_remaining_min_nodecr = (read_remaining_min +
381 read_remaining_min_nofastdecr)/2;
382 __u32 read_remaining;
384 BUG_ON(cn_lx->data_buf.read_remaining < rcvd);
385 BUG_ON(windowused > 31);
387 if (cn_lx->bufsize.ignore_rcv_lowbuf > 0) {
388 if (rcvd > cn_lx->bufsize.ignore_rcv_lowbuf)
389 cn_lx->bufsize.ignore_rcv_lowbuf = 0;
390 else
391 cn_lx->bufsize.ignore_rcv_lowbuf -= rcvd;
393 read_remaining = bufsize_bytes;
394 } else {
395 read_remaining = max(cn_lx->data_buf.read_remaining - rcvd,
396 (bufsize_bytes * (31 - windowused)) / 31);
399 if (rcv_flushrcvd != 0) {
400 __u32 bytesleft = (cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT);
401 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
402 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
403 cn_lx->bufsize.state = BUFSIZE_NOACTION;
404 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
405 } else if (cn_lx->bufsize.state == BUFSIZE_NOACTION &&
406 cn_lx->bufsize.act.noact.bytesleft <
407 bytesleft) {
408 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
412 if (read_remaining < read_remaining_min) {
413 __u32 buf_increase_bytes = read_remaining_min - read_remaining;
414 __u32 buf_increase;
415 __u32 bufsize_end;
417 if (high_latency_sender) {
418 if (buf_increase_bytes < rcvd/16)
419 buf_increase_bytes = rcvd/16;
420 } else {
421 if (buf_increase_bytes < rcvd/32)
422 buf_increase_bytes = rcvd/32;
425 buf_increase = (buf_increase_bytes << BUFSIZE_SHIFT);
426 if (unlikely((buf_increase >> BUFSIZE_SHIFT) !=
427 buf_increase_bytes))
428 buf_increase = U32_MAX;
430 bufsize_end = cn_lx->bufsize.bufsize + buf_increase;
431 if (unlikely(bufsize_end < cn_lx->bufsize.bufsize))
432 bufsize_end = U32_MAX;
434 if (cn_lx->bufsize.state != BUFSIZE_INCR &&
435 cn_lx->bufsize.state != BUFSIZE_INCR_FAST) {
436 cn_lx->bufsize.state = BUFSIZE_INCR;
437 cn_lx->bufsize.act.incr.size_start =
438 cn_lx->bufsize.bufsize;
439 cn_lx->bufsize.act.incr.size_end = 0;
442 if (bufsize_end > cn_lx->bufsize.act.incr.size_end)
443 cn_lx->bufsize.act.incr.size_end = bufsize_end;
444 if (bufsize_end/4 > cn_lx->bufsize.act.incr.size_start)
445 cn_lx->bufsize.state = BUFSIZE_INCR_FAST;
446 } else if (read_remaining < read_remaining_min_nodecr) {
447 if (cn_lx->bufsize.state == BUFSIZE_NOACTION ||
448 cn_lx->bufsize.state == BUFSIZE_DECR ||
449 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
450 __u32 bytesleft = 0;
451 __u8 rtt_mul = 2;
452 if (cn_lx->bufsize.state == BUFSIZE_NOACTION)
453 bytesleft = cn_lx->bufsize.act.noact.bytesleft;
454 if (high_latency_conn)
455 rtt_mul *= 2;
456 if (likely((cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT) *
457 rtt_mul > bytesleft))
458 bytesleft = (cn_lx->bufsize.bufsize >>
459 BUFSIZE_SHIFT) * rtt_mul;
461 cn_lx->bufsize.state = BUFSIZE_NOACTION;
462 cn_lx->bufsize.act.noact.bytesleft = bytesleft;
464 } else if (read_remaining < read_remaining_min_nofastdecr) {
465 if (cn_lx->bufsize.state == BUFSIZE_DECR ||
466 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
467 cn_lx->bufsize.state = BUFSIZE_DECR;
468 cn_lx->bufsize.act.decr.size_start =
469 cn_lx->bufsize.bufsize;
473 if (cn_lx->targettype == TARGET_OUT) {
474 __u16 rem_changerate = ((__u16)
475 cn_lx->trgt.out.remote_bufsize_changerate) + 64;
476 __u16 crate_nofastdecr;
477 __u16 crate_nodecr;
478 __u16 crate_nofastincr;
479 __u16 crate_noincr;
481 if (high_latency_conn) {
482 crate_nofastdecr = 128 - 128/8;
483 crate_nodecr = 128 - 128/6;
484 crate_nofastincr = 128 + 128/4;
485 crate_noincr = 128 + 128/3;
486 } else {
487 crate_nofastdecr = 128 - 128/16;
488 crate_nodecr = 128 - 128/12;
489 crate_nofastincr = 128 + 128/8;
490 crate_noincr = 128 + 128/6;
493 if ((rem_changerate < crate_nodecr ||
494 rem_changerate > crate_noincr) &&
495 cn_lx->bufsize.state == BUFSIZE_NOACTION) {
496 cn_lx->bufsize.act.noact.bytesleft = max(
497 cn_lx->bufsize.act.noact.bytesleft,
498 cn_lx->bufsize.bufsize >>
499 BUFSIZE_SHIFT);
502 if (rem_changerate < crate_nodecr && (
503 cn_lx->bufsize.state == BUFSIZE_DECR ||
504 cn_lx->bufsize.state == BUFSIZE_DECR_FAST)) {
505 cn_lx->bufsize.state = BUFSIZE_NOACTION;
506 cn_lx->bufsize.act.noact.bytesleft =
507 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
509 if (rem_changerate < crate_nofastdecr &&
510 cn_lx->bufsize.state == BUFSIZE_DECR_FAST) {
511 cn_lx->bufsize.state = BUFSIZE_DECR;
512 cn_lx->bufsize.act.decr.size_start =
513 cn_lx->bufsize.bufsize;
516 if (rem_changerate > crate_noincr && (
517 cn_lx->bufsize.state == BUFSIZE_INCR ||
518 cn_lx->bufsize.state == BUFSIZE_INCR_FAST)) {
519 cn_lx->bufsize.state = BUFSIZE_NOACTION;
520 cn_lx->bufsize.act.noact.bytesleft =
521 cn_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
523 if (rem_changerate > crate_nofastincr &&
524 cn_lx->bufsize.state == BUFSIZE_INCR_FAST) {
525 cn_lx->bufsize.state = BUFSIZE_INCR;
526 cn_lx->bufsize.act.incr.size_start =
527 cn_lx->bufsize.bufsize;
531 _cor_bufsize_update(cn_lx, rcvd, high_latency_sender,
532 high_latency_conn);
535 void cor_bufsize_read_to_sock(struct cor_conn *trgt_sock_lx)
537 unsigned long jiffies_tmp = jiffies;
538 __u32 latency_limit = (trgt_sock_lx->is_highlatency != 0 ?
539 HZ/10 : HZ/40);
541 if (trgt_sock_lx->trgt.sock.waiting_for_userspace != 0 && time_before(
542 trgt_sock_lx->trgt.sock.waiting_for_userspace_since,
543 jiffies - latency_limit)) {
544 trgt_sock_lx->bufsize.ignore_rcv_lowbuf =
545 trgt_sock_lx->bufsize.bufsize >> BUFSIZE_SHIFT;
548 if (trgt_sock_lx->data_buf.read_remaining == 0) {
549 trgt_sock_lx->trgt.sock.waiting_for_userspace = 0;
550 } else {
551 trgt_sock_lx->trgt.sock.waiting_for_userspace = 1;
552 trgt_sock_lx->trgt.sock.waiting_for_userspace_since =
553 jiffies_tmp;
557 static inline void cor_databuf_item_unlink(struct cor_conn *cn_lx,
558 struct cor_data_buf_item *item)
560 BUG_ON(item == cn_lx->data_buf.nextread);
561 list_del(&(item->buf_list));
562 if (item->type == DATABUF_BUF) {
563 cn_lx->data_buf.overhead -= sizeof(struct cor_data_buf_item) +
564 item->buflen - item->datalen;
565 } else if (item->type == DATABUF_SKB) {
566 cn_lx->data_buf.overhead -= sizeof(struct sk_buff);
567 } else {
568 BUG();
572 void cor_databuf_ackdiscard(struct cor_conn *cn_lx)
574 __u32 freed = 0;
576 cn_lx->data_buf.next_read_offset = 0;
577 cn_lx->data_buf.nextread = 0;
579 while (!list_empty(&(cn_lx->data_buf.items))) {
580 struct cor_data_buf_item *item = container_of(
581 cn_lx->data_buf.items.next,
582 struct cor_data_buf_item, buf_list);
583 freed += item->datalen;
585 cor_databuf_item_unlink(cn_lx, item);
586 cor_databuf_item_free(item);
589 cn_lx->data_buf.datasize -= freed;
590 cn_lx->data_buf.first_offset += freed;
592 BUG_ON(cn_lx->data_buf.datasize != 0);
593 BUG_ON(cn_lx->data_buf.overhead != 0);
595 cn_lx->data_buf.read_remaining = 0;
598 void cor_reset_seqno(struct cor_conn *cn_l, __u64 initseqno)
600 cn_l->data_buf.first_offset = initseqno -
601 cn_l->data_buf.datasize +
602 cn_l->data_buf.read_remaining;
605 static void cor_databuf_nextreadchunk(struct cor_conn *cn_lx)
607 BUG_ON(cn_lx->data_buf.nextread == 0);
608 BUG_ON(cn_lx->data_buf.next_read_offset !=
609 cn_lx->data_buf.nextread->datalen);
611 if (&(cn_lx->data_buf.nextread->buf_list) ==
612 cn_lx->data_buf.items.prev) {
613 BUG_ON(cn_lx->data_buf.read_remaining != 0);
614 cn_lx->data_buf.nextread = 0;
616 } else {
617 BUG_ON(cn_lx->data_buf.read_remaining == 0);
618 cn_lx->data_buf.nextread = container_of(
619 cn_lx->data_buf.nextread->buf_list.next,
620 struct cor_data_buf_item, buf_list);
623 cn_lx->data_buf.next_read_offset = 0;
626 void cor_databuf_pull(struct cor_conn *cn_lx, char *dst, __u32 len)
628 BUG_ON(cn_lx->data_buf.read_remaining < len);
630 while (len > 0) {
631 int cpy = len;
633 char *srcbufcpystart = 0;
634 int srcbufcpylen = 0;
636 BUG_ON(cn_lx->data_buf.nextread == 0);
637 BUG_ON(cn_lx->data_buf.next_read_offset >=
638 cn_lx->data_buf.nextread->datalen);
640 srcbufcpystart = cn_lx->data_buf.nextread->buf +
641 cn_lx->data_buf.next_read_offset;
642 srcbufcpylen = cn_lx->data_buf.nextread->datalen -
643 cn_lx->data_buf.next_read_offset;
645 if (cpy > srcbufcpylen)
646 cpy = srcbufcpylen;
648 memcpy(dst, srcbufcpystart, cpy);
650 dst += cpy;
651 len -= cpy;
653 cn_lx->data_buf.read_remaining -= cpy;
654 cn_lx->data_buf.next_read_offset += cpy;
656 if (cpy == srcbufcpylen)
657 cor_databuf_nextreadchunk(cn_lx);
661 void cor_databuf_unpull_dpi(struct cor_conn *trgt_sock, struct cor_sock *cs,
662 struct cor_data_buf_item *item, __u16 next_read_offset)
664 BUG_ON(next_read_offset > item->datalen);
666 if (next_read_offset >= item->datalen)
667 goto free;
669 spin_lock_bh(&(trgt_sock->rcv_lock));
671 if (unlikely(cor_is_trgt_sock(trgt_sock, cs) == 0)) {
672 spin_unlock_bh(&(trgt_sock->rcv_lock));
673 goto free;
676 BUG_ON(trgt_sock->data_buf.nextread != 0 &&
677 &(trgt_sock->data_buf.nextread->buf_list) !=
678 trgt_sock->data_buf.items.next);
679 BUG_ON(trgt_sock->data_buf.next_read_offset != 0);
681 trgt_sock->data_buf.first_offset -= item->datalen;
682 trgt_sock->data_buf.datasize += item->datalen;
683 trgt_sock->data_buf.read_remaining += item->datalen - next_read_offset;
685 if (item->type == DATABUF_BUF) {
686 trgt_sock->data_buf.overhead +=
687 sizeof(struct cor_data_buf_item) +
688 item->buflen - item->datalen;
689 } else if (item->type == DATABUF_SKB) {
690 trgt_sock->data_buf.overhead += sizeof(struct sk_buff);
691 } else {
692 BUG();
695 list_add(&(item->buf_list), &(trgt_sock->data_buf.items));
697 trgt_sock->data_buf.nextread = item;
698 trgt_sock->data_buf.next_read_offset = next_read_offset;
700 cor_account_bufspace(trgt_sock);
702 spin_unlock_bh(&(trgt_sock->rcv_lock));
704 if (0) {
705 free:
706 cor_databuf_item_free(item);
710 void cor_databuf_pull_dbi(struct cor_sock *cs_rl, struct cor_conn *trgt_sock_l)
712 struct cor_data_buf_item *dbi = 0;
713 BUG_ON(cs_rl->type != CS_TYPE_CONN_RAW);
714 BUG_ON(cs_rl->data.conn_raw.rcvitem != 0);
716 if (trgt_sock_l->data_buf.read_remaining == 0)
717 return;
719 BUG_ON(trgt_sock_l->data_buf.nextread == 0);
720 BUG_ON(trgt_sock_l->data_buf.next_read_offset >=
721 trgt_sock_l->data_buf.nextread->datalen);
722 dbi = trgt_sock_l->data_buf.nextread;
724 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
726 cs_rl->data.conn_raw.rcvitem = dbi;
727 cs_rl->data.conn_raw.rcvoffset = trgt_sock_l->data_buf.next_read_offset;
729 trgt_sock_l->data_buf.first_offset += dbi->datalen;
730 trgt_sock_l->data_buf.datasize -= dbi->datalen;
731 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
733 cor_account_bufspace(trgt_sock_l);
735 trgt_sock_l->data_buf.next_read_offset = dbi->datalen;
736 cor_databuf_nextreadchunk(trgt_sock_l);
738 cor_databuf_item_unlink(trgt_sock_l, dbi);
741 void cor_databuf_unpull(struct cor_conn *trgt_out_l, __u32 bytes)
743 trgt_out_l->data_buf.read_remaining += bytes;
745 BUG_ON(list_empty(&(trgt_out_l->data_buf.items)) != 0);
747 if (trgt_out_l->data_buf.nextread == 0) {
748 BUG_ON(trgt_out_l->data_buf.next_read_offset != 0);
750 trgt_out_l->data_buf.nextread = container_of(
751 trgt_out_l->data_buf.items.prev,
752 struct cor_data_buf_item, buf_list);
755 while (bytes > trgt_out_l->data_buf.next_read_offset) {
756 bytes -= trgt_out_l->data_buf.next_read_offset;
757 trgt_out_l->data_buf.nextread = container_of(
758 trgt_out_l->data_buf.nextread->buf_list.prev,
759 struct cor_data_buf_item, buf_list);
760 BUG_ON(&(trgt_out_l->data_buf.nextread->buf_list) ==
761 &(trgt_out_l->data_buf.items));
762 trgt_out_l->data_buf.next_read_offset =
763 trgt_out_l->data_buf.nextread->datalen;
766 trgt_out_l->data_buf.next_read_offset -= bytes;
769 void cor_databuf_pullold(struct cor_conn *trgt_out_l, __u64 startpos, char *dst,
770 int len)
772 __u64 pos = trgt_out_l->data_buf.first_offset;
773 struct cor_data_buf_item *dbi = container_of(
774 trgt_out_l->data_buf.items.next,
775 struct cor_data_buf_item, buf_list);
777 while (1) {
778 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
780 if (cor_seqno_after(pos + dbi->datalen, startpos))
781 break;
783 pos += dbi->datalen;
784 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
785 buf_list);
788 while (len > 0) {
789 int cpy = len;
791 char *srcbufcpystart = 0;
792 int srcbufcpylen = 0;
794 __u64 offset = cor_seqno_clean(startpos - pos);
796 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
798 BUG_ON(cor_seqno_before(startpos, pos));
799 BUG_ON(offset > dbi->datalen);
801 srcbufcpystart = dbi->buf + offset;
802 srcbufcpylen = dbi->datalen - offset;
804 if (cpy > srcbufcpylen)
805 cpy = srcbufcpylen;
807 memcpy(dst, srcbufcpystart, cpy);
809 dst += cpy;
810 len -= cpy;
811 startpos += cpy;
813 pos += dbi->datalen;
814 dbi = container_of(dbi->buf_list.next, struct cor_data_buf_item,
815 buf_list);
819 /* ack up to *not* including pos */
820 void cor_databuf_ack(struct cor_conn *trgt_out_l, __u64 pos)
822 __u32 acked = 0;
824 while (!list_empty(&(trgt_out_l->data_buf.items))) {
825 struct cor_data_buf_item *firstitem = container_of(
826 trgt_out_l->data_buf.items.next,
827 struct cor_data_buf_item, buf_list);
829 if (firstitem == trgt_out_l->data_buf.nextread)
830 break;
832 if (cor_seqno_after_eq(trgt_out_l->data_buf.first_offset +
833 firstitem->datalen, pos))
834 break;
836 trgt_out_l->data_buf.first_offset += firstitem->datalen;
837 acked += firstitem->datalen;
839 cor_databuf_item_unlink(trgt_out_l, firstitem);
840 cor_databuf_item_free(firstitem);
843 trgt_out_l->data_buf.datasize -= acked;
845 BUG_ON(trgt_out_l->data_buf.datasize == 0 &&
846 trgt_out_l->data_buf.overhead != 0);
848 if (unlikely(trgt_out_l->trgt.out.nblist_busy_remaining <= acked)) {
849 trgt_out_l->trgt.out.nblist_busy_remaining = 0;
850 cor_conn_set_last_act(trgt_out_l);
851 } else {
852 trgt_out_l->trgt.out.nblist_busy_remaining -= acked;
855 if (acked != 0)
856 cor_account_bufspace(trgt_out_l);
859 void cor_databuf_ackread(struct cor_conn *cn_lx)
861 __u32 acked = 0;
863 while (!list_empty(&(cn_lx->data_buf.items))) {
864 struct cor_data_buf_item *firstitem = container_of(
865 cn_lx->data_buf.items.next,
866 struct cor_data_buf_item, buf_list);
868 if (firstitem == cn_lx->data_buf.nextread)
869 break;
871 acked += firstitem->datalen;
873 cor_databuf_item_unlink(cn_lx, firstitem);
874 cor_databuf_item_free(firstitem);
877 cn_lx->data_buf.datasize -= acked;
878 cn_lx->data_buf.first_offset += acked;
880 BUG_ON(cn_lx->data_buf.datasize == 0 && cn_lx->data_buf.overhead != 0);
882 if (cn_lx->targettype == TARGET_OUT) {
883 if (unlikely(cn_lx->trgt.out.nblist_busy_remaining <=
884 acked)) {
885 cn_lx->trgt.out.nblist_busy_remaining = 0;
886 cor_conn_set_last_act(cn_lx);
887 } else {
888 cn_lx->trgt.out.nblist_busy_remaining -= acked;
892 if (acked != 0)
893 cor_account_bufspace(cn_lx);
896 __u32 _cor_receive_buf(struct cor_conn *cn_lx, char *buf, __u32 datalen,
897 int from_sock, __u8 windowused, __u8 flush)
899 struct cor_data_buf_item *item = 0;
901 __u32 totalcpy = 0;
903 if (list_empty(&(cn_lx->data_buf.items)) == 0) {
904 struct list_head *last = cn_lx->data_buf.items.prev;
905 item = container_of(last, struct cor_data_buf_item, buf_list);
908 while (datalen > 0) {
909 __u32 cpy = datalen;
911 BUG_ON(cn_lx->data_buf.datasize + datalen > (1 << 30));
912 BUG_ON(cn_lx->data_buf.overhead > (1 << 30));
914 if (item == 0 || item->type != DATABUF_BUF ||
915 item->buflen <= item->datalen) {
916 item = kmem_cache_alloc(cor_data_buf_item_slab,
917 GFP_ATOMIC);
918 if (unlikely(item == 0))
919 break;
921 memset(item, 0, sizeof(struct cor_data_buf_item));
922 item->type = DATABUF_BUF;
924 item->buflen = cor_buf_optlen(datalen, from_sock);
925 item->buf = kmalloc(item->buflen, GFP_ATOMIC);
927 if (unlikely(item->buf == 0)) {
928 kmem_cache_free(cor_data_buf_item_slab, item);
929 break;
931 item->datalen = 0;
933 list_add_tail(&(item->buf_list),
934 &(cn_lx->data_buf.items));
936 cn_lx->data_buf.overhead += item->buflen +
937 sizeof(struct cor_data_buf_item);
940 BUG_ON(item->type != DATABUF_BUF);
941 BUG_ON(item->buflen <= item->datalen);
943 if (cn_lx->data_buf.nextread == 0) {
944 cn_lx->data_buf.nextread = item;
945 cn_lx->data_buf.next_read_offset = item->datalen;
948 if (item->buflen - item->datalen < cpy)
949 cpy = (item->buflen - item->datalen);
951 memcpy(item->buf + item->datalen, buf, cpy);
952 item->datalen += cpy;
954 BUG_ON(cpy > datalen);
955 buf += cpy;
956 datalen -= cpy;
957 totalcpy += cpy;
959 cn_lx->data_buf.read_remaining += cpy;
960 cn_lx->data_buf.datasize += cpy;
961 cn_lx->data_buf.overhead -= cpy;
962 BUG_ON(cn_lx->data_buf.datasize != 0 &&
963 cn_lx->data_buf.overhead == 0);
966 if (datalen != 0)
967 flush = 0;
968 cn_lx->flush = flush;
970 cor_account_bufspace(cn_lx);
971 cor_bufsize_update(cn_lx, totalcpy, windowused, flush);
973 return totalcpy;
976 __u32 cor_receive_skb(struct cor_conn *src_in_l, struct sk_buff *skb,
977 __u8 windowused, __u8 flush)
979 struct cor_skb_procstate *ps = cor_skb_pstate(skb);
980 struct cor_data_buf_item *item = &(ps->funcstate.rcv.dbi);
982 __u32 bufferleft = 0;
984 BUG_ON(skb->len <= 0);
986 if (unlikely(unlikely(src_in_l->data_buf.datasize + skb->len >
987 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
988 (1 << 30))))
989 return 0;
991 if (list_empty(&(src_in_l->data_buf.items)) == 0) {
992 struct list_head *last = src_in_l->data_buf.items.prev;
993 struct cor_data_buf_item *item = container_of(last,
994 struct cor_data_buf_item, buf_list);
995 bufferleft = item->buflen - item->datalen;
998 if (skb->len < (sizeof(struct sk_buff) + bufferleft)) {
999 __u32 rc = cor_receive_buf(src_in_l, skb->data, skb->len,
1000 windowused, flush);
1001 if (likely(rc == skb->len))
1002 kfree_skb(skb);
1003 return rc;
1006 memset(item, 0, sizeof(struct cor_data_buf_item));
1008 item->type = DATABUF_SKB;
1009 item->buf = skb->data;
1010 item->datalen = skb->len;
1011 item->buflen = item->datalen;
1012 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
1013 if (src_in_l->data_buf.nextread == 0)
1014 src_in_l->data_buf.nextread = item;
1016 src_in_l->data_buf.read_remaining += item->datalen;
1017 src_in_l->data_buf.datasize += item->datalen;
1018 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
1020 cor_account_bufspace(src_in_l);
1021 cor_bufsize_update(src_in_l, skb->len, windowused, flush);
1023 src_in_l->flush = flush;
1025 return skb->len;
1028 void cor_wake_sender(struct cor_conn *cn)
1030 spin_lock_bh(&(cn->rcv_lock));
1032 if (unlikely(cn->isreset)) {
1033 spin_unlock_bh(&(cn->rcv_lock));
1034 return;
1037 switch (cn->sourcetype) {
1038 case SOURCE_UNCONNECTED:
1039 spin_unlock_bh(&(cn->rcv_lock));
1040 spin_lock_bh(&(cor_get_conn_reversedir(cn)->rcv_lock));
1041 if (likely(cor_get_conn_reversedir(cn)->isreset == 0 &&
1042 cor_get_conn_reversedir(cn)->targettype ==
1043 TARGET_UNCONNECTED))
1044 cor_proc_cpacket(cor_get_conn_reversedir(cn));
1045 spin_unlock_bh(&(cor_get_conn_reversedir(cn)->rcv_lock));
1046 break;
1047 case SOURCE_SOCK:
1048 if (_cor_mngdsocket_flushtoconn(cn) == RC_FTC_OK &&
1049 cn->src.sock.ed->cs != 0 &&
1050 cor_sock_sndbufavailable(cn, 1))
1051 cor_sk_write_space(cn->src.sock.ed->cs);
1052 spin_unlock_bh(&(cn->rcv_lock));
1053 break;
1054 case SOURCE_IN:
1055 cor_drain_ooo_queue(cn);
1056 if (likely(cn->src.in.established != 0)) {
1057 cor_send_ack_conn_ifneeded(cn, 0, 0);
1059 spin_unlock_bh(&(cn->rcv_lock));
1060 break;
1061 default:
1062 BUG();
1066 int __init cor_forward_init(void)
1068 cor_data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
1069 sizeof(struct cor_data_buf_item), 8, 0, 0);
1070 if (unlikely(cor_data_buf_item_slab == 0))
1071 return -ENOMEM;
1073 atomic64_set(&cor_bufused_sum, 0);
1075 return 0;
1078 void __exit cor_forward_exit2(void)
1080 BUG_ON(atomic64_read(&cor_bufused_sum) != 0);
1082 kmem_cache_destroy(cor_data_buf_item_slab);
1083 cor_data_buf_item_slab = 0;
1086 MODULE_LICENSE("GPL");