conn rcv_lock converted to spinlock, struct cor_sock created, kernel_packet skb_clone...
[cor_2_6_31.git] / net / cor / forward.c
blob1ebf939330f8f4320d36996c981489988b5ceba6
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <linux/mutex.h>
23 #include "cor.h"
25 struct kmem_cache *data_buf_item_slab;
27 void databuf_init(struct conn *cn_init)
29 memset(&(cn_init->data_buf), 0, sizeof(cn_init->data_buf));
30 INIT_LIST_HEAD(&(cn_init->data_buf.items));
33 static inline void databuf_item_unlink(struct conn *cn_l, struct data_buf_item *item)
35 list_del(&(item->buf_list));
36 if (item->type == TYPE_BUF) {
37 cn_l->data_buf.overhead -= sizeof(struct data_buf_item) +
38 item->buflen - item->datalen;
39 } else if (item->type == TYPE_SKB) {
40 cn_l->data_buf.overhead -= sizeof(struct sk_buff);
41 } else {
42 BUG();
46 void databuf_ackdiscard(struct conn *cn_l)
48 __u32 freed = 0;
49 while (!list_empty(&(cn_l->data_buf.items))) {
50 struct data_buf_item *item = container_of(
51 cn_l->data_buf.items.next,
52 struct data_buf_item, buf_list);
53 freed += item->datalen;
55 databuf_item_unlink(cn_l, item);
56 databuf_item_free(item);
59 cn_l->data_buf.totalsize -= freed;
60 cn_l->data_buf.first_offset += freed;
62 BUG_ON(cn_l->data_buf.totalsize != 0);
63 BUG_ON(cn_l->data_buf.overhead != 0);
65 if (cn_l->data_buf.cpacket_buffer != 0) {
66 free_cpacket_buffer(cn_l->data_buf.cpacket_buffer);
67 cn_l->data_buf.cpacket_buffer = 0;
70 cn_l->data_buf.read_remaining = 0;
71 cn_l->data_buf.last_read_offset = 0;
72 cn_l->data_buf.lastread = 0;
74 if (cn_l->isreset == 0 && cn_l->sourcetype == SOURCE_IN)
75 refresh_speedstat(cn_l, freed);
78 void reset_seqno(struct conn *cn_l, __u32 initseqno)
80 cn_l->data_buf.first_offset = initseqno -
81 cn_l->data_buf.totalsize +
82 cn_l->data_buf.read_remaining;
85 static void databuf_nextreadchunk(struct conn *cn_l)
87 if (cn_l->data_buf.lastread == 0) {
88 BUG_ON(cn_l->data_buf.last_read_offset != 0);
89 BUG_ON(list_empty(&(cn_l->data_buf.items)));
90 cn_l->data_buf.lastread = container_of(
91 cn_l->data_buf.items.next,
92 struct data_buf_item, buf_list);
93 } else if (&(cn_l->data_buf.lastread->buf_list) !=
94 cn_l->data_buf.items.prev) {
95 cn_l->data_buf.lastread = container_of(
96 cn_l->data_buf.lastread->buf_list.next,
97 struct data_buf_item, buf_list);
99 cn_l->data_buf.last_read_offset = 0;
103 void databuf_pull(struct conn *cn_l, char *dst, int len)
105 BUG_ON(cn_l->data_buf.read_remaining < len);
107 if (cn_l->data_buf.lastread == 0)
108 databuf_nextreadchunk(cn_l);
110 while(len > 0) {
111 int cpy = len;
113 char *srcbufcpystart = 0;
114 int srcbufcpylen = 0;
116 BUG_ON(cn_l->data_buf.lastread == 0);
118 srcbufcpystart = cn_l->data_buf.lastread->buf +
119 cn_l->data_buf.last_read_offset;
120 srcbufcpylen = cn_l->data_buf.lastread->datalen -
121 cn_l->data_buf.last_read_offset;
123 if (cpy > srcbufcpylen)
124 cpy = srcbufcpylen;
126 memcpy(dst, srcbufcpystart, cpy);
128 dst += cpy;
129 len -= cpy;
131 cn_l->data_buf.read_remaining -= cpy;
132 cn_l->data_buf.last_read_offset += cpy;
134 if (cpy == srcbufcpylen)
135 databuf_nextreadchunk(cn_l);
139 void databuf_pull_dbi(struct cor_sock *cs_rl, struct conn *trgt_sock_l)
141 struct data_buf_item *dbi = 0;
142 BUG_ON(cs_rl->type != SOCKTYPE_CONN);
143 BUG_ON(cs_rl->data.conn.rcvitem != 0);
145 if (trgt_sock_l->data_buf.read_remaining == 0)
146 return;
148 if (trgt_sock_l->data_buf.lastread == 0)
149 databuf_nextreadchunk(trgt_sock_l);
151 dbi = trgt_sock_l->data_buf.lastread;
153 BUG_ON(dbi == 0);
155 BUG_ON(&(dbi->buf_list) != trgt_sock_l->data_buf.items.next);
157 cs_rl->data.conn.rcvitem = dbi;
158 cs_rl->data.conn.rcvoffset = trgt_sock_l->data_buf.last_read_offset;
160 trgt_sock_l->data_buf.first_offset += dbi->datalen;
161 trgt_sock_l->data_buf.totalsize -= dbi->datalen;
162 trgt_sock_l->data_buf.read_remaining -= dbi->datalen;
164 if (unlikely(trgt_sock_l->data_buf.cpacket_buffer != 0)) {
165 __u32 amount = dbi->datalen <
166 trgt_sock_l->data_buf.cpacket_buffer ?
167 dbi->datalen :
168 trgt_sock_l->data_buf.cpacket_buffer;
169 free_cpacket_buffer(amount);
170 trgt_sock_l->data_buf.cpacket_buffer -= amount;
173 databuf_item_unlink(trgt_sock_l, dbi);
175 trgt_sock_l->data_buf.lastread = 0;
176 trgt_sock_l->data_buf.last_read_offset = 0;
178 /* databuf_item_free(firstitem); */
181 void databuf_unpull(struct conn *trgt_out_l, __u32 bytes)
183 trgt_out_l->data_buf.read_remaining += bytes;
185 BUG_ON(trgt_out_l->data_buf.lastread == 0);
187 while (bytes > trgt_out_l->data_buf.last_read_offset) {
188 bytes -= trgt_out_l->data_buf.last_read_offset;
189 trgt_out_l->data_buf.lastread = container_of(
190 trgt_out_l->data_buf.lastread->buf_list.prev,
191 struct data_buf_item, buf_list);
192 BUG_ON(&(trgt_out_l->data_buf.lastread->buf_list) ==
193 &(trgt_out_l->data_buf.items));
196 trgt_out_l->data_buf.last_read_offset -= bytes;
199 void databuf_pullold(struct conn *trgt_out_l, __u32 startpos, char *dst,
200 int len)
202 __u32 pos = trgt_out_l->data_buf.first_offset;
203 struct data_buf_item *dbi = container_of(
204 trgt_out_l->data_buf.items.next,
205 struct data_buf_item, buf_list);
207 while(1) {
208 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
210 if (((__s32) (pos + dbi->datalen - startpos)) > 0)
211 break;
213 pos += dbi->datalen;
214 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
215 buf_list);
218 while (len > 0) {
219 int cpy = len;
221 char *srcbufcpystart = 0;
222 int srcbufcpylen = 0;
224 BUG_ON(&(dbi->buf_list) == &(trgt_out_l->data_buf.items));
226 BUG_ON(((__s32) (pos - startpos)) > 0);
228 srcbufcpystart = dbi->buf + ((__s32) (startpos - pos));
229 srcbufcpylen = dbi->datalen - ((__s32) (startpos - pos));
231 if (cpy > srcbufcpylen)
232 cpy = srcbufcpylen;
234 memcpy(dst, srcbufcpystart, cpy);
236 dst += cpy;
237 len -= cpy;
238 startpos += cpy;
240 pos += dbi->datalen;
241 dbi = container_of(dbi->buf_list.next, struct data_buf_item,
242 buf_list);
246 /* ack up to *not* including pos */
247 void databuf_ack(struct conn *trgt_out_l, __u32 pos)
249 __u32 acked = 0;
251 while (!list_empty(&(trgt_out_l->data_buf.items))) {
252 struct data_buf_item *firstitem = container_of(
253 trgt_out_l->data_buf.items.next,
254 struct data_buf_item, buf_list);
256 if (firstitem == trgt_out_l->data_buf.lastread)
257 break;
259 if ( ((__s32) (trgt_out_l->data_buf.first_offset +
260 firstitem->datalen - pos)) > 0)
261 break;
263 trgt_out_l->data_buf.first_offset += firstitem->datalen;
264 acked += firstitem->datalen;
266 databuf_item_unlink(trgt_out_l, firstitem);
267 databuf_item_free(firstitem);
270 trgt_out_l->data_buf.totalsize -= acked;
272 BUG_ON(trgt_out_l->data_buf.totalsize == 0 &&
273 trgt_out_l->data_buf.overhead != 0);
275 if (unlikely(trgt_out_l->data_buf.cpacket_buffer != 0)) {
276 __u32 amount = acked < trgt_out_l->data_buf.cpacket_buffer ?
277 acked : trgt_out_l->data_buf.cpacket_buffer;
278 free_cpacket_buffer(amount);
279 trgt_out_l->data_buf.cpacket_buffer -= amount;
282 if (trgt_out_l->sourcetype == SOURCE_IN)
283 refresh_speedstat(trgt_out_l, acked);
286 void databuf_ackread(struct conn *cn_l)
288 __u32 acked = 0;
290 while (!list_empty(&(cn_l->data_buf.items)) &&
291 cn_l->data_buf.lastread != 0) {
292 struct data_buf_item *firstitem = container_of(
293 cn_l->data_buf.items.next,
294 struct data_buf_item, buf_list);
296 if (firstitem == cn_l->data_buf.lastread)
297 break;
299 acked += firstitem->datalen;
301 databuf_item_unlink(cn_l, firstitem);
302 databuf_item_free(firstitem);
305 cn_l->data_buf.first_offset += acked;
306 cn_l->data_buf.totalsize -= acked;
308 BUG_ON(cn_l->data_buf.totalsize == 0 && cn_l->data_buf.overhead != 0);
310 if (unlikely(cn_l->data_buf.cpacket_buffer != 0)) {
311 __u32 amount = acked < cn_l->data_buf.cpacket_buffer ?
312 acked : cn_l->data_buf.cpacket_buffer;
313 free_cpacket_buffer(amount);
314 cn_l->data_buf.cpacket_buffer -= amount;
317 if (cn_l->sourcetype == SOURCE_IN)
318 refresh_speedstat(cn_l, acked);
321 __s64 receive_buf(struct conn *cn_l, char *buf, __u32 datalen, __u32 buflen,
322 int forcecpy)
324 char *freewhenfinished = 0;
325 struct data_buf_item *item = 0;
327 __s64 totalcpy = 0;
329 if (list_empty(&(cn_l->data_buf.items)) == 0) {
330 struct list_head *last = cn_l->data_buf.items.prev;
331 item = container_of(last, struct data_buf_item, buf_list);
334 while (datalen > 0) {
335 int rc = 0;
336 int cpy = datalen;
338 #warning todo convert to bugon and do check on caller
339 if (unlikely(unlikely(cn_l->data_buf.totalsize + datalen >
340 (1 << 30)) ||
341 unlikely(cn_l->data_buf.overhead > (1<< 30)))) {
342 rc = -EAGAIN;
343 goto error;
346 if (forcecpy == 0 && item != 0 &&
347 datalen < (item->buflen - item->datalen) &&
348 datalen*2 < (buflen +
349 sizeof(struct data_buf_item))) {
350 forcecpy = 1;
351 freewhenfinished = buf;
354 if (forcecpy == 0 || item == 0 ||
355 item->buflen <= item->datalen) {
356 item = kmem_cache_alloc(data_buf_item_slab, GFP_ATOMIC);
357 if (unlikely(item == 0)) {
358 rc = -ENOMEM;
359 goto error;
362 memset(item, 0, sizeof(item));
363 item->type = TYPE_BUF;
365 if (forcecpy == 0) {
366 item->buf = buf;
367 item->datalen = datalen;
368 item->buflen = buflen;
369 } else {
370 buflen = buf_optlen(datalen);
371 item->buf = kmalloc(buflen, GFP_ATOMIC);
373 if (unlikely(item->buf == 0)) {
374 kmem_cache_free(data_buf_item_slab,
375 item);
376 rc = -ENOMEM;
377 goto error;
379 item->datalen = 0;
380 item->buflen = buflen;
383 list_add_tail(&(item->buf_list),
384 &(cn_l->data_buf.items));
386 cn_l->data_buf.overhead += item->buflen +
387 sizeof(struct data_buf_item);
390 if (forcecpy == 0) {
391 cpy = item->datalen;
392 } else {
393 BUG_ON(item->type != TYPE_BUF);
394 BUG_ON(item->buflen <= item->datalen);
396 if (item->buflen - item->datalen < cpy)
397 cpy = (item->buflen - item->datalen);
399 memcpy(item->buf + item->datalen, buf, cpy);
400 item->datalen += cpy;
403 buf += cpy;
404 datalen -= cpy;
406 cn_l->data_buf.read_remaining += cpy;
407 cn_l->data_buf.totalsize += cpy;
408 cn_l->data_buf.overhead -= cpy;
409 BUG_ON(cn_l->data_buf.totalsize != 0 &&
410 cn_l->data_buf.overhead == 0);
411 totalcpy += cpy;
413 error:
414 if (unlikely(rc < 0)) {
415 if (totalcpy == 0)
416 return rc;
417 break;
421 if (freewhenfinished != 0)
422 kfree(freewhenfinished);
424 return totalcpy;
427 void receive_cpacketresp(struct conn *trtg_unconn_l, char *buf, int len)
429 __s64 rc;
430 BUG_ON(trtg_unconn_l->data_buf.cpacket_buffer <
431 trtg_unconn_l->data_buf.totalsize + len);
432 rc = receive_buf(trtg_unconn_l, buf, len, len, 1);
433 BUG_ON(rc < 0);
434 BUG_ON(rc < len);
437 int receive_skb(struct conn *src_in_l, struct sk_buff *skb)
439 struct skb_procstate *ps = skb_pstate(skb);
440 struct data_buf_item *item = &(ps->funcstate.rcv.dbi);
442 if (unlikely(unlikely(src_in_l->data_buf.totalsize + skb->len >
443 (1 << 30)) || unlikely(src_in_l->data_buf.overhead >
444 (1 << 30))))
445 return 1;
447 item->type = TYPE_SKB;
448 item->buf = skb->data;
449 item->datalen = skb->len;
450 item->buflen = item->datalen;
451 list_add_tail(&(item->buf_list), &(src_in_l->data_buf.items));
453 src_in_l->data_buf.read_remaining += item->datalen;
454 src_in_l->data_buf.totalsize += item->datalen;
455 src_in_l->data_buf.overhead += sizeof(struct sk_buff);
457 return 0;
460 void wake_sender(struct conn *cn)
462 unreserve_sock_buffer(cn);
464 spin_lock_bh(&(cn->rcv_lock));
465 switch (cn->sourcetype) {
466 case SOURCE_NONE:
467 spin_unlock_bh(&(cn->rcv_lock));
468 parse(cn->reversedir, 0);
469 break;
470 case SOURCE_SOCK:
471 wake_up_interruptible(&(cn->source.sock.wait));
472 spin_unlock_bh(&(cn->rcv_lock));
473 break;
474 case SOURCE_IN:
475 drain_ooo_queue(cn);
476 spin_unlock_bh(&(cn->rcv_lock));
477 get_window(cn, 0, 0, 0);
478 break;
479 default:
480 BUG();
484 void flush_buf(struct conn *cn)
486 int rc;
487 int sent = 0;
488 spin_lock_bh(&(cn->rcv_lock));
490 switch (cn->targettype) {
491 case TARGET_UNCONNECTED:
492 spin_unlock_bh(&(cn->rcv_lock));
493 parse(cn, 0);
494 break;
495 case TARGET_SOCK:
496 if (cn->sourcetype != SOURCE_SOCK ||
497 cn->source.sock.delay_flush == 0 ||
498 cn->data_buf.totalsize +
499 cn->data_buf.overhead -
500 cn->data_buf.cpacket_buffer >=
501 BUFFERLIMIT_SOCK_SOCK/2)
502 wake_up_interruptible(&(cn->target.sock.wait));
503 spin_unlock_bh(&(cn->rcv_lock));
504 break;
505 case TARGET_OUT:
506 rc = flush_out(cn, 0, 0);
507 spin_unlock_bh(&(cn->rcv_lock));
508 sent = (rc == RC_FLUSH_CONN_OUT_OK_SENT);
509 break;
510 case TARGET_DISCARD:
511 databuf_ackdiscard(cn);
512 spin_unlock_bh(&(cn->rcv_lock));
513 sent = 1;
514 break;
515 default:
516 BUG();
519 refresh_conn_credits(cn, 0, 0);
521 if (sent) {
522 wake_sender(cn);
526 void __init forward_init(void)
528 data_buf_item_slab = kmem_cache_create("cor_data_buf_item",
529 sizeof(struct data_buf_item), 8, 0, 0);
532 MODULE_LICENSE("GPL");