conn reference renaming and locking logic
[cor_2_6_31.git] / net / cor / forward.c
bloba8422458995b194fe502461a5953b2907ee0b48f
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 #define PAGESIZE (1 << PAGE_SHIFT)
29 void databuf_init(struct conn *cn_init)
31 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
32 INIT_LIST_HEAD(&(cn_init->data_buf.items));
35 static void databuf_item_free(struct conn *cn_l, struct data_buf_item *item)
37 if (item->type == TYPE_BUF) {
38 kfree(item->data.buf.buf);
39 cn_l->data_buf.overhead -= item->data.buf.buflen -
40 item->data.buf.datalen;
41 } else if (item->type == TYPE_SKB) {
42 kfree_skb(item->data.skb);
43 cn_l->data_buf.overhead -= sizeof(struct sk_buff);
44 } else {
45 BUG();
48 cn_l->data_buf.overhead -= sizeof(struct data_buf_item);
50 list_del(&(item->buf_list));
52 kmem_cache_free(data_buf_item_slab, item);
55 void databuf_free(struct conn *cn_l)
57 while (!list_empty(&(cn_l->data_buf.items))) {
58 struct data_buf_item *item = container_of(
59 cn_l->data_buf.items.next,
60 struct data_buf_item, buf_list);
61 if (item->type == TYPE_BUF) {
62 cn_l->data_buf.totalsize -= item->data.buf.datalen;
63 } else if (item->type == TYPE_SKB) {
64 cn_l->data_buf.totalsize -= item->data.skb->len;
65 } else {
66 BUG();
69 databuf_item_free(cn_l, item);
72 BUG_ON(cn_l->data_buf.totalsize != 0);
73 BUG_ON(cn_l->data_buf.overhead != 0);
75 if (cn_l->data_buf.cpacket_buffer != 0) {
76 free_cpacket_buffer(cn_l->data_buf.cpacket_buffer);
77 cn_l->data_buf.cpacket_buffer = 0;
81 void reset_seqno(struct conn *cn_l, __u32 initseqno)
83 cn_l->data_buf.first_offset = initseqno -
84 cn_l->data_buf.last_read_offset;
87 static void databuf_nextreadchunk(struct conn *cn_l)
89 if (cn_l->data_buf.lastread == 0) {
90 BUG_ON(cn_l->data_buf.last_read_offset != 0);
91 BUG_ON(list_empty(&(cn_l->data_buf.items)));
92 cn_l->data_buf.lastread = container_of(cn_l->data_buf.items.next,
93 struct data_buf_item, buf_list);
94 } else if (&(cn_l->data_buf.lastread->buf_list) !=
95 cn_l->data_buf.items.prev) {
96 cn_l->data_buf.lastread = container_of(
97 cn_l->data_buf.lastread->buf_list.next,
98 struct data_buf_item, buf_list);
100 cn_l->data_buf.last_read_offset = 0;
104 static int _databuf_pull(struct conn *cn_l, char *dst, int len, int userbuf)
106 int totalcpy = 0;
108 BUG_ON(cn_l->data_buf.read_remaining < len);
110 if (cn_l->data_buf.lastread == 0)
111 databuf_nextreadchunk(cn_l);
113 while(len > 0) {
114 int rc = 0;
115 int cpy = len;
117 char *srcbuf = 0;
118 int srcbuflen = 0;
120 char *srcbufcpystart = 0;
121 int srcbufcpylen = 0;
123 BUG_ON(cn_l->data_buf.lastread == 0);
125 if (cn_l->data_buf.lastread->type == TYPE_BUF) {
126 srcbuf = cn_l->data_buf.lastread->data.buf.buf;
127 srcbuflen = cn_l->data_buf.lastread->data.buf.datalen;
128 } else if (cn_l->data_buf.lastread->type == TYPE_SKB) {
129 srcbuf = cn_l->data_buf.lastread->data.skb->data;
130 srcbuflen = cn_l->data_buf.lastread->data.skb->len;
131 } else {
132 BUG();
135 srcbufcpystart = srcbuf + cn_l->data_buf.last_read_offset;
136 srcbufcpylen = srcbuflen - cn_l->data_buf.last_read_offset;
138 if (cpy > srcbufcpylen)
139 cpy = srcbufcpylen;
141 if (userbuf) {
142 int notcopied = copy_to_user(dst, srcbufcpystart, cpy);
143 cpy -= notcopied;
144 if (unlikely(notcopied > 0))
145 rc = -EFAULT;
146 } else {
147 memcpy(dst, srcbufcpystart, cpy);
150 dst += cpy;
151 len -= cpy;
152 totalcpy += cpy;
154 cn_l->data_buf.read_remaining -= cpy;
155 cn_l->data_buf.last_read_offset += cpy;
157 if (cpy == srcbufcpylen)
158 databuf_nextreadchunk(cn_l);
160 if (unlikely(rc < 0)) {
161 if (totalcpy == 0)
162 totalcpy = rc;
163 break;
167 return totalcpy;
170 void databuf_pull(struct conn *cn_l, char *dst, int len)
172 _databuf_pull(cn_l, dst, len, 0);
175 size_t databuf_pulluser(struct conn *trgt_sock_l, struct msghdr *msg)
177 size_t copied = 0;
178 int iovidx = 0;
179 int iovread = 0;
181 while (iovidx < msg->msg_iovlen) {
182 int rc;
184 struct iovec *iov = msg->msg_iov + iovidx;
185 __user char *msg = iov->iov_base + iovread;
186 unsigned int len = iov->iov_len - iovread;
188 if (len == 0) {
189 iovidx++;
190 iovread = 0;
191 continue;
194 if (trgt_sock_l->data_buf.read_remaining == 0) {
195 if (trgt_sock_l->sourcetype == SOURCE_NONE)
196 rc = -EPIPE;
197 else
198 rc = -EAGAIN;
199 } else {
200 if (len > trgt_sock_l->data_buf.read_remaining)
201 len = trgt_sock_l->data_buf.read_remaining;
203 rc = _databuf_pull(trgt_sock_l, msg, len, 1);
206 BUG_ON(rc == 0);
208 if (rc < 0) {
209 if (copied == 0)
210 copied = rc;
211 break;
214 copied += rc;
215 iovread += rc;
218 return copied;
221 void databuf_unpull(struct conn *trgt_out_l, __u32 bytes)
223 trgt_out_l->data_buf.read_remaining += bytes;
225 BUG_ON(trgt_out_l->data_buf.lastread == 0);
227 while (bytes > trgt_out_l->data_buf.last_read_offset) {
228 bytes -= trgt_out_l->data_buf.last_read_offset;
229 trgt_out_l->data_buf.lastread = container_of(
230 trgt_out_l->data_buf.lastread->buf_list.prev,
231 struct data_buf_item, buf_list);
232 BUG_ON(&(trgt_out_l->data_buf.lastread->buf_list) ==
233 &(trgt_out_l->data_buf.items));
236 trgt_out_l->data_buf.last_read_offset -= bytes;
239 void databuf_pullold(struct conn *trgt_out_l, __u32 startpos, char *dst,
240 int len)
242 __u32 pos = trgt_out_l->data_buf.first_offset;
243 struct data_buf_item *dbi = container_of(
244 trgt_out_l->data_buf.items.next,
245 struct data_buf_item, buf_list);
247 while(1) {
248 int srcbuflen;
250 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
252 if (trgt_out_l->data_buf.lastread->type == TYPE_BUF) {
253 srcbuflen = dbi->data.buf.datalen;
254 } else if (trgt_out_l->data_buf.lastread->type == TYPE_SKB) {
255 srcbuflen = dbi->data.skb->len;
256 } else {
257 BUG();
260 if (((__s32) (pos + srcbuflen - startpos)) > 0)
261 break;
263 pos += srcbuflen;
264 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
265 buf_list);
268 while (len > 0) {
269 int cpy = len;
271 char *srcbuf = 0;
272 int srcbuflen = 0;
274 char *srcbufcpystart = 0;
275 int srcbufcpylen = 0;
277 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
279 if (trgt_out_l->data_buf.lastread->type == TYPE_BUF) {
280 srcbuf = trgt_out_l->data_buf.lastread->data.buf.buf;
281 srcbuflen = trgt_out_l->data_buf.lastread->data.buf.datalen;
282 } else if (trgt_out_l->data_buf.lastread->type == TYPE_SKB) {
283 srcbuf = trgt_out_l->data_buf.lastread->data.skb->data;
284 srcbuflen = trgt_out_l->data_buf.lastread->data.skb->len;
285 } else {
286 BUG();
289 BUG_ON(((__s32) (pos - startpos)) > 0);
291 srcbufcpystart = srcbuf + ((__s32) (startpos - pos));
292 srcbufcpylen = srcbuflen - ((__s32) (startpos - pos));
294 if (cpy > srcbufcpylen)
295 cpy = srcbufcpylen;
297 memcpy(dst, srcbufcpystart, cpy);
299 dst += cpy;
300 len -= cpy;
301 startpos += cpy;
303 pos += srcbuflen;
304 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
305 buf_list);
309 /* ack up to *not* including pos */
310 void databuf_ack(struct conn *trgt_out_l, __u32 pos)
312 __u32 acked = 0;
314 while (!list_empty(&(trgt_out_l->data_buf.items))) {
315 struct data_buf_item *firstitem = container_of(
316 trgt_out_l->data_buf.items.next,
317 struct data_buf_item, buf_list);
318 int firstlen = 0;
320 if (firstitem == trgt_out_l->data_buf.lastread)
321 break;
323 if (firstitem->type == TYPE_BUF) {
324 firstlen = firstitem->data.buf.datalen;
325 } else if (firstitem->type == TYPE_SKB) {
326 firstlen = firstitem->data.skb->len;
327 } else {
328 BUG();
331 if ( ((__s32) (trgt_out_l->data_buf.first_offset + firstlen -
332 pos)) > 0)
333 break;
335 trgt_out_l->data_buf.first_offset += firstlen;
336 acked += firstlen;
338 databuf_item_free(trgt_out_l, firstitem);
341 trgt_out_l->data_buf.totalsize -= acked;
343 BUG_ON(trgt_out_l->data_buf.totalsize == 0 &&
344 trgt_out_l->data_buf.overhead != 0);
346 if (unlikely(trgt_out_l->data_buf.cpacket_buffer != 0)) {
347 __u32 amount = acked > trgt_out_l->data_buf.cpacket_buffer ?
348 acked : trgt_out_l->data_buf.cpacket_buffer;
349 free_cpacket_buffer(amount);
350 trgt_out_l->data_buf.cpacket_buffer -= amount;
353 if (trgt_out_l->sourcetype == SOURCE_IN)
354 refresh_speedstat(trgt_out_l, acked);
357 void databuf_ackread(struct conn *cn_l)
359 __u32 acked = 0;
361 while (!list_empty(&(cn_l->data_buf.items)) &&
362 cn_l->data_buf.lastread != 0) {
363 struct data_buf_item *firstitem = container_of(
364 cn_l->data_buf.items.next,
365 struct data_buf_item, buf_list);
367 if (firstitem == cn_l->data_buf.lastread)
368 break;
370 if (firstitem->type == TYPE_BUF) {
371 acked += firstitem->data.buf.datalen;
372 } else if (firstitem->type == TYPE_SKB) {
373 acked += firstitem->data.skb->len;
374 } else {
375 BUG();
378 databuf_item_free(cn_l, firstitem);
381 cn_l->data_buf.first_offset += acked;
382 cn_l->data_buf.totalsize -= acked;
384 BUG_ON(cn_l->data_buf.totalsize == 0 && cn_l->data_buf.overhead != 0);
386 if (unlikely(cn_l->data_buf.cpacket_buffer != 0)) {
387 __u32 amount = acked > cn_l->data_buf.cpacket_buffer ?
388 acked : cn_l->data_buf.cpacket_buffer;
389 free_cpacket_buffer(amount);
390 cn_l->data_buf.cpacket_buffer -= amount;
393 if (cn_l->sourcetype == SOURCE_IN)
394 refresh_speedstat(cn_l, acked);
397 static __s64 _receive_buf(struct conn *cn_l, char *buf, __u32 len, int userbuf,
398 __u32 maxcpy, __u32 maxusage)
400 struct data_buf_item *item = 0;
402 __s64 totalcpy = 0;
404 if (list_empty(&(cn_l->data_buf.items)) == 0) {
405 struct list_head *last = cn_l->data_buf.items.prev;
406 item = container_of(last, struct data_buf_item, buf_list);
409 while (len > 0) {
410 int rc = 0;
411 int cpy = len;
413 if (item == 0 || item->type != TYPE_BUF ||
414 item->data.buf.buflen <=
415 item->data.buf.datalen) {
416 __u32 buflen = len;
418 if (maxusage != 0) {
419 if (cn_l->data_buf.totalsize +
420 cn_l->data_buf.overhead >
421 maxusage) {
422 rc = -EAGAIN;
423 goto error;
426 buflen = maxusage - cn_l->data_buf.totalsize -
427 cn_l->data_buf.overhead -
428 sizeof(struct data_buf_item);
429 } else {
430 if (totalcpy + 64 > maxcpy &&
431 totalcpy + len > maxcpy) {
432 rc = -EAGAIN;
433 goto error;
436 if (totalcpy + buflen < maxcpy)
437 buflen = maxcpy - totalcpy;
440 if (buflen < 64)
441 buflen = 64;
442 if (buflen > PAGESIZE)
443 buflen = PAGESIZE;
444 if (buflen > 32768)
445 buflen = 32768;
447 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
448 if (unlikely(item == 0)) {
449 rc = -ENOMEM;
450 goto error;
452 memset(item, 0, sizeof(item));
453 item->type = TYPE_BUF;
454 item->data.buf.buf = kmalloc(buflen, GFP_KERNEL);
456 if (unlikely(item->data.buf.buf == 0)) {
457 kmem_cache_free(data_buf_item_slab, item);
458 rc = -ENOMEM;
459 goto error;
461 item->data.buf.datalen = 0;
463 list_add_tail(&(item->buf_list),
464 &(cn_l->data_buf.items));
465 item->data.buf.buflen = buflen;
466 cn_l->data_buf.overhead += buflen +
467 sizeof(struct data_buf_item);
470 BUG_ON(item->type != TYPE_BUF);
471 BUG_ON(item->data.buf.buflen <= item->data.buf.datalen);
473 if (item->data.buf.buflen - item->data.buf.datalen < cpy)
474 cpy = (item->data.buf.buflen - item->data.buf.datalen);
476 if (userbuf) {
477 int notcopied = copy_from_user(item->data.buf.buf +
478 item->data.buf.datalen, buf, cpy);
479 cpy -= notcopied;
480 if (unlikely(notcopied > 0))
481 rc = -EFAULT;
482 } else {
483 memcpy(item->data.buf.buf + item->data.buf.datalen,
484 buf, cpy);
487 buf += cpy;
488 len -= cpy;
489 cn_l->data_buf.read_remaining += cpy;
490 cn_l->data_buf.totalsize += cpy;
491 cn_l->data_buf.overhead -= cpy;
492 BUG_ON(cn_l->data_buf.totalsize == 0 &&
493 cn_l->data_buf.overhead != 0);
494 totalcpy += cpy;
496 item->data.buf.datalen += cpy;
498 error:
499 if (unlikely(rc < 0)) {
500 if (totalcpy == 0)
501 return rc;
502 break;
506 return totalcpy;
509 __s64 receive_userbuf(struct conn *src_sock_l, struct msghdr *msg, __u32 maxcpy,
510 __u32 maxusage)
512 __s64 copied = 0;
513 int iovidx = 0;
514 int iovread = 0;
516 while (iovidx < msg->msg_iovlen) {
517 struct iovec *iov = msg->msg_iov + iovidx;
518 __user char *userbuf = iov->iov_base + iovread;
519 __u32 len = iov->iov_len - iovread;
520 __s64 rc;
522 if (len == 0) {
523 iovidx++;
524 iovread = 0;
525 continue;
528 BUG_ON(copied < 0);
529 BUG_ON(copied > maxcpy);
530 rc = _receive_buf(src_sock_l, userbuf, len, 1, maxcpy - copied,
531 maxusage);
533 if (rc < 0) {
534 if (copied == 0)
535 copied = rc;
536 break;
539 copied += rc;
540 iovread += rc;
542 if (rc < len)
543 break;
546 return copied;
549 void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len)
551 __s64 rc;
552 BUG_ON(trtg_unconn_l->data_buf.cpacket_buffer <
553 trtg_unconn_l->data_buf.totalsize +len);
554 rc = _receive_buf(trtg_unconn_l, buf, len, 0, len, 0);
555 BUG_ON(rc < 0);
556 BUG_ON(rc < len);
559 int receive_skb(struct conn *src_in_l, struct sk_buff *skb)
561 struct data_buf_item *item;
563 item = kmem_cache_alloc(data_buf_item_slab, GFP_KERNEL);
565 if (unlikely(item == 0))
566 return 1;
568 item->data.skb = skb;
569 item->type = TYPE_SKB;
570 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
571 src_in_l->data_buf.read_remaining += skb->len;
572 src_in_l->data_buf.totalsize += skb->len;
573 src_in_l->data_buf.overhead += sizeof(struct data_buf_item) +
574 sizeof(struct sk_buff);
576 BUG_ON(src_in_l->data_buf.totalsize == 0 &&
577 src_in_l->data_buf.overhead != 0);
579 return 0;
582 static void _wake_sender_in(struct conn *src_in_l)
584 int windowlimitreached = (src_in_l->source.in.next_seqno ==
585 src_in_l->source.in.window_seqnolimit_last);
586 struct neighbor *nb = src_in_l->source.in.nb;
587 __u32 conn_id = src_in_l->reversedir->target.out.conn_id;
588 __u32 next_seqno = src_in_l->source.in.next_seqno;
590 __u8 window;
591 struct control_msg_out *cm;
593 drain_ooo_queue(src_in_l);
594 mutex_unlock(&(src_in_l->rcv_lock));
596 if (windowlimitreached == 0)
597 return;
599 window = get_window(src_in_l, nb);
600 if (window == 0)
601 return;
603 cm = alloc_control_msg(nb, ACM_PRIORITY_HIGH);
604 if (unlikely(cm == 0))
605 send_ping_all_conns(nb);
606 else
607 send_ack_conn(cm, src_in_l, conn_id, next_seqno);
610 void wake_sender(struct conn *cn)
612 mutex_lock(&(cn->rcv_lock));
613 switch (cn->sourcetype) {
614 case SOURCE_NONE:
615 mutex_unlock(&(cn->rcv_lock));
616 parse(cn->reversedir, 0);
617 break;
618 case SOURCE_SOCK:
619 wake_up_interruptible(&(cn->source.sock.wait));
620 mutex_unlock(&(cn->rcv_lock));
621 break;
622 case SOURCE_IN:
623 _wake_sender_in(cn); /* mutex_unlock inside */
624 break;
625 default:
626 BUG();
630 void flush_buf(struct conn *cn)
632 int rc = RC_FLUSH_CONN_OUT_OK;
633 mutex_lock(&(cn->rcv_lock));
635 switch (cn->targettype) {
636 case TARGET_UNCONNECTED:
637 mutex_unlock(&(cn->rcv_lock));
638 parse(cn, 0);
639 break;
640 case TARGET_SOCK:
641 if (cn->sourcetype != SOURCE_SOCK ||
642 cn->source.sock.delay_flush == 0 ||
643 cn->data_buf.totalsize +
644 cn->data_buf.overhead -
645 cn->data_buf.cpacket_buffer >=
646 BUFFERLIMIT_SOCK_SOCK/2)
647 wake_up_interruptible(&(cn->target.sock.wait));
648 mutex_unlock(&(cn->rcv_lock));
649 break;
650 case TARGET_OUT:
651 rc = flush_out(cn, 0, 0);
652 mutex_unlock(&(cn->rcv_lock));
653 break;
654 default:
655 BUG();
658 refresh_conn_credits(cn, 0, 0);
659 unreserve_sock_buffer(cn);
661 if (rc == RC_FLUSH_CONN_OUT_CONG) {
662 #warning todo locking
663 qos_enqueue(cn->target.out.nb->dev, &(cn->target.out.rb),
664 QOS_CALLER_CONN);
665 } else if (rc == RC_FLUSH_CONN_OUT_OOM) {
666 printk(KERN_DEBUG "oom");
667 #warning todo locking
668 qos_enqueue(cn->target.out.nb->dev, &(cn->target.out.rb),
669 QOS_CALLER_CONN);
670 } else if (rc == RC_FLUSH_CONN_OUT_OK_SENT) {
671 wake_sender(cn);
675 void __init forward_init(void)
677 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
678 sizeof(struct data_buf_item), 8, 0, 0);
681 MODULE_LICENSE("GPL");