sock_buffertracker conn_reset bugfix, cmsg timings, dynamic cmsg timer tick, add_mess...
[cor_2_6_31.git] / net / cor / sock.c
blob1495a3b16ff24bb0c015469fe24d69f6db2d5134
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <net/sock.h>
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
25 #include "cor.h"
27 /**
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
31 DEFINE_MUTEX(sock_bufferlimits_lock);
32 LIST_HEAD(sock_bt_list);
33 LIST_HEAD(sock_bt_wait_list);
34 static __u64 sock_bufferusage;
36 static struct work_struct outofsockbufferspace_work;
37 static int outofsockbufferspace_scheduled;
39 static void free_sbt(struct kref *ref)
41 struct sock_buffertracker *sbt = container_of(ref,
42 struct sock_buffertracker, ref);
44 BUG_ON(sbt->usage != 0);
45 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
47 list_del(&(sbt->lh));
48 kfree(sbt);
51 static struct sock_buffertracker *get_sock_buffertracker(uid_t uid)
53 struct sock_buffertracker *sbt;
54 struct list_head *curr;
56 curr = sock_bt_list.next;
57 while (curr != &sock_bt_list) {
58 sbt = container_of(curr, struct sock_buffertracker, lh);
59 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
60 if (sbt->uid == uid)
61 goto found;
62 curr = curr->next;
65 curr = sock_bt_wait_list.next;
66 while (curr != &sock_bt_wait_list) {
67 sbt = container_of(curr, struct sock_buffertracker, lh);
68 BUG_ON(list_empty(&(sbt->waiting_conns)));
69 if (sbt->uid == uid)
70 goto found;
71 curr = curr->next;
74 sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_KERNEL);
75 if (sbt != 0) {
76 memset(sbt, 0, sizeof(struct sock_buffertracker));
77 sbt->uid = uid;
78 list_add_tail(&(sbt->lh), &sock_bt_list);
79 INIT_LIST_HEAD(&(sbt->delflush_conns));
80 INIT_LIST_HEAD(&(sbt->waiting_conns));
81 kref_init(&(sbt->ref));
84 if (0) {
85 found:
86 kref_get(&(sbt->ref));
88 return sbt;
91 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker *sbt,
92 int waitingconnremoved)
94 if (waitingconnremoved && list_empty(&(sbt->waiting_conns))) {
95 list_del(&(sbt->lh));
96 list_add_tail(&(sbt->lh), &sock_bt_list);
97 return;
100 if (list_empty(&(sbt->waiting_conns)))
101 return;
103 while(sbt->lh.next != &sock_bt_wait_list) {
104 struct sock_buffertracker *next = container_of(sbt->lh.next,
105 struct sock_buffertracker, lh);
107 BUG_ON(sbt->lh.next == &sock_bt_list);
109 if (sbt->usage <= next->usage)
110 break;
112 list_del(&(sbt->lh));
113 list_add(&(sbt->lh), &(next->lh));
117 static int oosbs_resumesbt(struct sock_buffertracker *sbt)
119 int restart = 0;
120 struct list_head *curr = sbt->delflush_conns.next;
122 while (curr != &(sbt->delflush_conns)) {
123 struct conn *src_in_o = container_of(curr, struct conn,
124 source.sock.delflush_list);
125 int flush = 0;
127 mutex_lock(&(src_in_o->rcv_lock));
129 BUG_ON(src_in_o->sourcetype != SOURCE_SOCK);
131 BUG_ON(src_in_o->source.sock.delay_flush == 0);
133 if (src_in_o->data_buf.read_remaining != 0) {
134 src_in_o->source.sock.delay_flush = 0;
135 list_del(&(src_in_o->source.sock.delflush_list));
136 flush = 1;
139 mutex_unlock(&(src_in_o->rcv_lock));
141 if (flush) {
142 if (restart == 0) {
143 restart = 1;
144 kref_get(&(sbt->ref));
145 mutex_unlock(&sock_bufferlimits_lock);
147 flush_buf(src_in_o);
150 curr = curr->next;
153 if (restart)
154 kref_put(&(sbt->ref), free_sbt);
156 return restart;
159 static void oosbs_global(void)
161 struct list_head *curr;
163 if (0) {
164 restart:
165 mutex_lock(&sock_bufferlimits_lock);
168 curr = sock_bt_list.prev;
169 while (curr != &sock_bt_list) {
170 struct sock_buffertracker *sbt = container_of(curr,
171 struct sock_buffertracker, lh);
172 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
173 if (oosbs_resumesbt(sbt))
174 goto restart;
175 curr = curr->prev;
178 curr = sock_bt_wait_list.prev;
179 while (curr != &sock_bt_wait_list) {
180 struct sock_buffertracker *sbt = container_of(curr,
181 struct sock_buffertracker, lh);
182 BUG_ON(list_empty(&(sbt->waiting_conns)));
183 if (oosbs_resumesbt(sbt))
184 goto restart;
185 curr = curr->prev;
189 static void oosbs_user(void)
191 struct list_head *curr;
193 if (0) {
194 restart:
195 mutex_lock(&sock_bufferlimits_lock);
198 curr = sock_bt_wait_list.prev;
199 while (curr != &sock_bt_wait_list) {
200 struct sock_buffertracker *sbt = container_of(curr,
201 struct sock_buffertracker, lh);
202 BUG_ON(list_empty(&(sbt->waiting_conns)));
204 if (sbt->usage < (BUFFERLIMIT_SOCK_USER * 3 / 4))
205 break;
207 if (oosbs_resumesbt(sbt))
208 goto restart;
209 curr = curr->prev;
213 static void outofsockbufferspace(struct work_struct *work)
215 mutex_lock(&sock_bufferlimits_lock);
216 if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) {
217 oosbs_user();
218 if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4))
219 goto global;
220 } else {
221 global:
222 oosbs_global();
224 outofsockbufferspace_scheduled = 0;
225 mutex_unlock(&sock_bufferlimits_lock);
228 static void _reserve_sock_buffer_inswl(struct conn *src_in_l)
230 struct sock_buffertracker *sbt = src_in_l->source.sock.sbt;
231 struct list_head *curr;
233 BUG_ON(sbt == 0);
235 if (list_empty(&(sbt->waiting_conns)) == 0)
236 goto wlinserted;
238 list_del(&(sbt->lh));
240 curr = sock_bt_wait_list.next;
241 while (curr != &sock_bt_wait_list) {
242 struct sock_buffertracker *currsbt = container_of(curr,
243 struct sock_buffertracker, lh);
244 BUG_ON(list_empty(&(currsbt->waiting_conns)));
245 if (sbt->usage < currsbt->usage) {
246 list_add(&(sbt->lh), curr);
247 goto wlinserted;
249 curr = curr->next;
252 list_add_tail(&(sbt->lh), &sock_bt_wait_list);
254 wlinserted:
255 curr = sbt->waiting_conns.next;
256 while (curr != &(sbt->waiting_conns)) {
257 struct conn *currrconn = container_of(curr, struct conn,
258 source.sock.alwait_list);
259 BUG_ON(currrconn->sourcetype != SOURCE_SOCK);
260 if (src_in_l->source.sock.alloclimit <
261 currrconn->source.sock.alloclimit) {
262 list_add(&(src_in_l->source.sock.alwait_list), curr);
263 goto wcinserted;
265 curr = curr->next;
268 list_add_tail(&(src_in_l->source.sock.alwait_list),
269 &(sbt->waiting_conns));
271 wcinserted:
272 src_in_l->source.sock.in_alwait_list = 1;
274 if (outofsockbufferspace_scheduled == 0) {
275 schedule_work(&outofsockbufferspace_work);
276 outofsockbufferspace_scheduled = 1;
280 static void reserve_sock_buffer(struct conn *src_in_l, __u64 amount)
282 struct sock_buffertracker *sbt = src_in_l->source.sock.sbt;
283 struct sock_buffertracker *first_wait_sbt = list_empty(
284 &sock_bt_wait_list) ? 0 : container_of(
285 sock_bt_wait_list.next, struct sock_buffertracker, lh);
287 __u32 max = (1 << 30) - 1;
289 BUG_ON(sbt == 0);
291 if (unlikely(amount > max))
292 amount = max;
294 amount += src_in_l->data_buf.totalsize + src_in_l->data_buf.overhead -
295 src_in_l->data_buf.cpacket_buffer;
297 if (unlikely(amount > max))
298 amount = max;
300 if (amount > BUFFERLIMIT_SOCK_SOCK)
301 amount = BUFFERLIMIT_SOCK_SOCK;
303 if (amount <= src_in_l->source.sock.alloclimit)
304 return;
306 if ((list_empty(&sock_bt_wait_list) == 0 && first_wait_sbt != 0 &&
307 first_wait_sbt != sbt &&
308 first_wait_sbt->usage <= sbt->usage) ||
309 amount - src_in_l->source.sock.alloclimit >
310 BUFFERLIMIT_SOCK_USER - sbt->usage ||
311 amount - src_in_l->source.sock.alloclimit >
312 BUFFERLIMIT_SOCK_GLOBAL - sock_bufferusage) {
313 _reserve_sock_buffer_inswl(src_in_l);
314 } else {
315 int waitingconnremoved = 0;
316 sbt->usage += amount - src_in_l->source.sock.alloclimit;
317 sock_bufferusage += amount - src_in_l->source.sock.alloclimit;
318 src_in_l->source.sock.alloclimit = amount;
320 if (src_in_l->source.sock.in_alwait_list){
321 list_del(&(src_in_l->source.sock.alwait_list));
322 src_in_l->source.sock.in_alwait_list = 0;
323 waitingconnremoved = 1;
325 _reserve_sock_buffer_reord_bt(sbt, waitingconnremoved);
329 static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt)
331 int failed = 0;
333 while (list_empty(&(sbt->waiting_conns)) && failed == 0) {
334 struct conn *src_in_o = container_of(sbt->waiting_conns.next,
335 struct conn, source.sock.alwait_list);
336 mutex_lock(&(src_in_o->rcv_lock));
338 BUG_ON(src_in_o->sourcetype == SOURCE_SOCK);
339 BUG_ON(src_in_o->source.sock.in_alwait_list == 0);
340 BUG_ON(src_in_o->source.sock.wait_len == 0);
342 reserve_sock_buffer(src_in_o, src_in_o->source.sock.wait_len);
344 if (src_in_o->source.sock.alloclimit +
345 src_in_o->data_buf.cpacket_buffer <=
346 src_in_o->data_buf.totalsize +
347 src_in_o->data_buf.overhead) {
348 failed = 1;
349 goto out;
352 wake_up_interruptible(&(src_in_o->source.sock.wait));
354 out:
355 mutex_unlock(&(src_in_o->rcv_lock));
358 return failed;
361 static void resume_bufferwaiting_socks(void)
363 struct list_head *curr = sock_bt_wait_list.next;
365 while (curr != &sock_bt_wait_list) {
366 struct sock_buffertracker *currsbt = container_of(curr,
367 struct sock_buffertracker, lh);
368 BUG_ON(list_empty(&(currsbt->waiting_conns)));
369 curr = curr->next;
371 if (_resume_bufferwaiting_socks(currsbt))
372 return;
376 static void reorder_sock_bt_wait_list(struct sock_buffertracker *sbt)
378 if (list_empty(&(sbt->waiting_conns)))
379 return;
381 while (sbt->lh.prev != &sock_bt_wait_list) {
382 struct sock_buffertracker *prevsbt = container_of(sbt->lh.prev,
383 struct sock_buffertracker, lh);
385 BUG_ON(sbt->lh.next == &sock_bt_list);
387 if (prevsbt->usage <= sbt->usage)
388 break;
390 list_del(&(sbt->lh));
391 list_add_tail(&(sbt->lh), &(prevsbt->lh));
395 void connreset_sbt(struct conn *cn)
397 struct sock_buffertracker *sbt;
399 mutex_lock(&sock_bufferlimits_lock);
400 mutex_lock(&(cn->rcv_lock));
402 if (cn->sourcetype != SOURCE_SOCK)
403 goto out;
405 sbt = cn->source.sock.sbt;
406 BUG_ON(sbt == 0);
408 if (cn->source.sock.in_alwait_list) {
409 list_del(&(cn->source.sock.alwait_list));
410 cn->source.sock.in_alwait_list = 0;
412 if (list_empty(&(sbt->waiting_conns))) {
413 list_del(&(sbt->lh));
414 list_add_tail(&(sbt->lh), &sock_bt_list);
417 reorder_sock_bt_wait_list(sbt);
420 sbt->usage -= cn->source.sock.alloclimit;
421 if (cn->source.sock.delay_flush) {
422 cn->source.sock.delay_flush = 0;
423 list_del(&(cn->source.sock.delflush_list));
425 kref_put(&(sbt->ref), free_sbt);
426 cn->source.sock.sbt = 0;
428 out:
429 mutex_unlock(&(cn->rcv_lock));
430 mutex_unlock(&sock_bufferlimits_lock);
433 void unreserve_sock_buffer(struct conn *cn)
435 int freed = 0;
436 struct sock_buffertracker *sbt;
438 mutex_lock(&sock_bufferlimits_lock);
439 mutex_lock(&(cn->rcv_lock));
441 if (cn->sourcetype != SOURCE_SOCK)
442 goto out;
444 if (unlikely(atomic_read(&(cn->isreset)) != 0))
445 goto out;
447 sbt = cn->source.sock.sbt;
448 BUG_ON(sbt == 0);
450 if (cn->data_buf.totalsize + cn->data_buf.overhead <=
451 cn->source.sock.alloclimit +
452 cn->data_buf.cpacket_buffer)
453 goto out;
455 freed = 1;
457 BUG_ON(cn->source.sock.alloclimit > sbt->usage);
458 BUG_ON(cn->source.sock.alloclimit > sock_bufferusage);
459 BUG_ON(cn->data_buf.cpacket_buffer > cn->data_buf.totalsize +
460 cn->data_buf.overhead);
462 sbt->usage -= cn->source.sock.alloclimit;
463 sbt->usage += cn->data_buf.totalsize;
464 sbt->usage += cn->data_buf.overhead;
465 sbt->usage -= cn->data_buf.cpacket_buffer;
467 sock_bufferusage -= cn->source.sock.alloclimit;
468 sock_bufferusage += cn->data_buf.totalsize;
469 sock_bufferusage += cn->data_buf.overhead;
470 sock_bufferusage -= cn->data_buf.cpacket_buffer;
472 cn->source.sock.alloclimit = cn->data_buf.totalsize +
473 cn->data_buf.overhead - cn->data_buf.cpacket_buffer;
475 if (cn->source.sock.alloclimit == 0 &&
476 cn->source.sock.in_alwait_list) {
477 list_del(&(cn->source.sock.alwait_list));
478 cn->source.sock.in_alwait_list = 0;
480 if (list_empty(&(sbt->waiting_conns))) {
481 list_del(&(sbt->lh));
482 list_add_tail(&(sbt->lh), &sock_bt_list);
486 reorder_sock_bt_wait_list(sbt);
488 out:
489 mutex_unlock(&(cn->rcv_lock));
491 if (freed)
492 resume_bufferwaiting_socks();
494 mutex_unlock(&sock_bufferlimits_lock);
498 static int check_connlistener_state(struct connlistener *cl)
500 if (likely(cl != 0 && cl->sockstate == SOCKSTATE_LISTENER))
501 return 0;
503 return 1;
506 static int check_conn_state(struct conn *cn)
508 if (likely(cn != 0 && cn->sockstate == SOCKSTATE_CONN))
509 return 0;
511 return 1;
514 int cor_socket_release(struct socket *sock)
516 struct connlistener *cl = (struct connlistener *) sock->sk;
517 struct conn *src_in_o = (struct conn *) sock->sk;
519 if (sock->sk == 0)
520 return 0;
522 if (cl->sockstate == SOCKSTATE_LISTENER) {
523 close_port(cl);
524 } else if (src_in_o->sockstate == SOCKSTATE_CONN) {
525 reset_conn(src_in_o);
526 BUG_ON(src_in_o->sourcetype != SOURCE_SOCK);
527 kref_put(&(src_in_o->ref), free_conn);
528 } else {
529 BUG();
532 return 0;
535 int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr,
536 int sockaddr_len)
538 struct connlistener *listener;
539 struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr;
541 if (unlikely(sock->sk != 0))
542 return -EINVAL;
544 if (sockaddr_len < sizeof(struct cor_sockaddr))
545 return -EINVAL;
547 if (addr->type != SOCKADDRTYPE_PORT)
548 return -EINVAL;
550 listener = open_port(addr->addr.port);
552 if (listener == 0)
553 return -EADDRINUSE;
555 sock->sk = (struct sock *) listener;
557 return 0;
560 int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr,
561 int sockaddr_len, int flags)
563 struct sock_buffertracker *sbt;
565 struct conn *src_sock;
567 if (unlikely(sock->sk != 0))
568 return -EISCONN;
570 src_sock = alloc_conn(GFP_KERNEL);
572 if (unlikely(src_sock == 0))
573 return -ENOMEM;
575 src_sock->is_client = 1;
577 mutex_lock(&sock_bufferlimits_lock);
578 sbt = get_sock_buffertracker(current_uid());
579 mutex_unlock(&sock_bufferlimits_lock);
581 if (unlikely(sbt == 0)) {
582 reset_conn(src_sock);
583 return -ENOMEM;
586 kref_get(&(src_sock->ref));
588 mutex_lock(&(src_sock->rcv_lock));
589 mutex_lock(&(src_sock->reversedir->rcv_lock));
590 conn_init_sock_source(src_sock);
591 src_sock->source.sock.sbt = sbt;
592 conn_init_sock_target(src_sock->reversedir);
593 mutex_unlock(&(src_sock->reversedir->rcv_lock));
594 mutex_unlock(&(src_sock->rcv_lock));
596 sock->sk = (struct sock *) src_sock;
597 sock->state = SS_CONNECTED;
599 return 0;
602 static int cor_rdytoaccept(struct connlistener *cl)
604 int rc;
605 mutex_lock(&(cl->lock));
606 rc = (list_empty(&(cl->conn_queue)) == 0);
607 mutex_unlock(&(cl->lock));
608 return rc;
611 const struct proto_ops cor_proto_ops;
613 int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags)
615 struct sock_buffertracker *sbt;
617 struct connlistener *cl = (struct connlistener *) sock->sk;
619 int rc = check_connlistener_state(cl);
621 struct conn *src_sock_o;
623 if (unlikely(rc))
624 return -EINVAL;
626 mutex_lock(&sock_bufferlimits_lock);
627 sbt = get_sock_buffertracker(current_uid());
628 mutex_unlock(&sock_bufferlimits_lock);
630 if (unlikely(sbt == 0))
631 return -ENOMEM;
633 mutex_lock(&(cl->lock));
635 if (unlikely(cl->queue_maxlen <= 0)) {
636 mutex_unlock(&(cl->lock));
637 return -EINVAL;
640 while (list_empty(&(cl->conn_queue))) {
641 mutex_unlock(&(cl->lock));
642 if (wait_event_interruptible(cl->wait, cor_rdytoaccept(cl))) {
643 kref_put(&(sbt->ref), free_sbt);
644 return -ERESTARTSYS;
646 mutex_lock(&(cl->lock));
649 src_sock_o = container_of(cl->conn_queue.next, struct conn,
650 source.sock.cl_list);
652 BUG_ON(src_sock_o->sourcetype != SOURCE_SOCK);
654 list_del(cl->conn_queue.next);
656 cl->queue_len--;
658 mutex_unlock(&(cl->lock));
660 mutex_lock(&(src_sock_o->rcv_lock));
661 src_sock_o->source.sock.sbt = sbt;
662 mutex_unlock(&(src_sock_o->rcv_lock));
664 newsock->ops = &cor_proto_ops;
665 newsock->sk = (struct sock *) src_sock_o;
666 newsock->state = SS_CONNECTED;
668 return 0;
671 int cor_socket_listen(struct socket *sock, int len)
673 struct connlistener *cl = (struct connlistener *) sock->sk;
675 int rc = check_connlistener_state(cl);
677 if (unlikely(rc))
678 return -EOPNOTSUPP;
680 mutex_lock(&(cl->lock));
681 cl->queue_maxlen = len;
682 mutex_unlock(&(cl->lock));
684 return 0;
687 int cor_socket_shutdown(struct socket *sock, int flags)
689 return -ENOTSUPP;
692 int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
694 return -ENOIOCTLCMD;
697 static int sendmsg_maypush(struct conn *src_sock_o)
699 int ret = 0;
700 mutex_lock(&sock_bufferlimits_lock);
701 mutex_lock(&(src_sock_o->rcv_lock));
702 if (unlikely(atomic_read(&(src_sock_o->isreset)) != 0)) {
703 ret = 1;
704 } else if (src_sock_o->source.sock.wait_len == 0) {
705 ret = 1;
706 } else if (src_sock_o->source.sock.alloclimit +
707 src_sock_o->data_buf.cpacket_buffer >
708 src_sock_o->data_buf.totalsize +
709 src_sock_o->data_buf.overhead) {
710 ret = 1;
711 } else {
712 reserve_sock_buffer(src_sock_o,
713 src_sock_o->source.sock.wait_len);
714 if (src_sock_o->source.sock.alloclimit +
715 src_sock_o->data_buf.cpacket_buffer >
716 src_sock_o->data_buf.totalsize +
717 src_sock_o->data_buf.overhead)
718 ret = 1;
720 mutex_unlock(&(src_sock_o->rcv_lock));
721 mutex_unlock(&sock_bufferlimits_lock);
722 return ret;
725 int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
726 size_t total_len)
728 __s64 copied = 0;
730 struct conn *src_sock_o = (struct conn *) sock->sk;
732 int rc = check_conn_state(src_sock_o);
734 int flush = (msg->msg_flags & MSG_MORE) == 0;
735 int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0;
737 __s32 bufferfree;
738 __u64 max = (1LL << 32) - 1;
739 __u32 totallen = (total_len > max ? max : total_len);
741 if (unlikely(rc))
742 return -EBADF;
744 recv:
745 mutex_lock(&sock_bufferlimits_lock);
746 mutex_lock(&(src_sock_o->rcv_lock));
748 if (unlikely(atomic_read(&(src_sock_o->isreset)) != 0)) {
749 mutex_lock(&(src_sock_o->rcv_lock));
750 mutex_unlock(&sock_bufferlimits_lock);
751 return -EPIPE;
754 reserve_sock_buffer(src_sock_o, totallen);
756 bufferfree = (__s64) src_sock_o->source.sock.alloclimit +
757 (__s64) src_sock_o->data_buf.cpacket_buffer -
758 (__s64) src_sock_o->data_buf.totalsize -
759 (__s64) src_sock_o->data_buf.overhead;
761 mutex_unlock(&sock_bufferlimits_lock);
763 if (bufferfree <= 0) {
764 if (copied == 0)
765 copied = -EAGAIN;
766 goto out;
769 copied = receive_userbuf(src_sock_o, msg, bufferfree, bufferfree >=
770 totallen ? 0 : (src_sock_o->source.sock.alloclimit +
771 src_sock_o->data_buf.cpacket_buffer));
774 if (0) {
775 out:
776 bufferfree = (__s64) src_sock_o->source.sock.alloclimit +
777 (__s64) src_sock_o->data_buf.cpacket_buffer -
778 (__s64) src_sock_o->data_buf.totalsize -
779 (__s64) src_sock_o->data_buf.overhead;
782 if (copied == -EAGAIN)
783 src_sock_o->source.sock.wait_len = totallen;
784 else
785 src_sock_o->source.sock.wait_len = 0;
787 mutex_unlock(&(src_sock_o->rcv_lock));
789 unreserve_sock_buffer(src_sock_o);
791 mutex_lock(&sock_bufferlimits_lock);
792 mutex_lock(&(src_sock_o->rcv_lock));
794 if (unlikely(atomic_read(&(src_sock_o->isreset)) != 0)) {
795 mutex_lock(&(src_sock_o->rcv_lock));
796 mutex_unlock(&sock_bufferlimits_lock);
797 return -EPIPE;
800 if (flush == 0 && src_sock_o->data_buf.totalsize +
801 src_sock_o->data_buf.overhead -
802 src_sock_o->data_buf.cpacket_buffer <
803 (BUFFERLIMIT_SOCK_SOCK*3)/4) {
804 if (src_sock_o->source.sock.delay_flush == 0) {
805 struct sock_buffertracker *sbt =
806 src_sock_o->source.sock.sbt;
807 BUG_ON(sbt == 0);
808 list_add_tail(&(src_sock_o->source.sock.delflush_list),
809 &(sbt->delflush_conns));
811 src_sock_o->source.sock.delay_flush = 1;
812 } else {
813 if (src_sock_o->source.sock.delay_flush) {
814 list_del(&(src_sock_o->source.sock.delflush_list));
816 src_sock_o->source.sock.delay_flush = 0;
819 mutex_unlock(&(src_sock_o->rcv_lock));
820 mutex_unlock(&sock_bufferlimits_lock);
822 if (likely(copied > 0 || bufferfree <= 0))
823 flush_buf(src_sock_o);
825 if (copied == -EAGAIN && blocking) {
826 if (wait_event_interruptible(src_sock_o->source.sock.wait,
827 sendmsg_maypush(src_sock_o)) == 0)
828 goto recv;
829 copied = -ERESTARTSYS;
832 BUG_ON(copied > total_len);
833 return copied;
836 static int cor_readytoread(struct conn *trgt_sock_o)
838 int rc = 0;
839 mutex_lock(&(trgt_sock_o->rcv_lock));
840 rc = (trgt_sock_o->data_buf.read_remaining != 0) ||
841 unlikely(atomic_read(&(trgt_sock_o->isreset)) != 0);
842 mutex_unlock(&(trgt_sock_o->rcv_lock));
843 return rc;
846 int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
847 size_t total_len, int flags)
849 struct conn *src_sock_o = (struct conn *) sock->sk;
850 struct conn *trgt_sock_o;
851 size_t copied = 0;
853 int rc = check_conn_state(src_sock_o);
855 int blocking = (flags & MSG_DONTWAIT) == 0;
857 if (unlikely(rc))
858 return -EBADF;
860 trgt_sock_o = src_sock_o->reversedir;
862 BUG_ON(trgt_sock_o == 0);
864 recv:
865 mutex_lock(&(trgt_sock_o->rcv_lock));
867 if (unlikely(atomic_read(&(trgt_sock_o->isreset)) != 0)) {
868 copied = -EPIPE;
869 goto out;
872 copied = databuf_pulluser(trgt_sock_o, msg);
873 databuf_ackread(trgt_sock_o);
875 out:
876 mutex_unlock(&(trgt_sock_o->rcv_lock));
878 if (likely(copied > 0)) {
879 refresh_conn_credits(trgt_sock_o, 0, 0);
880 #warning todo move unreserve calls to wake_sender
881 unreserve_sock_buffer(trgt_sock_o);
882 wake_sender(trgt_sock_o);
886 if (copied == -EAGAIN && blocking) {
887 if (wait_event_interruptible(trgt_sock_o->target.sock.wait,
888 cor_readytoread(trgt_sock_o)) == 0)
889 goto recv;
890 copied = -ERESTARTSYS;
893 return copied;
896 const struct proto_ops cor_proto_ops = {
897 .family = PF_COR,
898 .owner = THIS_MODULE,
899 .release = cor_socket_release,
900 .bind = cor_socket_bind,
901 .connect = cor_socket_connect,
902 .accept = cor_socket_accept,
903 .listen = cor_socket_listen,
904 .shutdown = cor_socket_shutdown,
905 .ioctl = cor_ioctl,
906 .sendmsg = cor_sendmsg,
907 .recvmsg = cor_recvmsg
909 /*socketpair
910 getname
911 poll
912 compat_ioctl
913 setsockopt
914 getsockopt
915 compat_setsockopt
916 compat_getsockopt
917 mmap
918 sendpage
919 splice_read*/
922 int cor_createsock(struct net *net, struct socket *sock, int protocol)
924 if (unlikely(protocol != 0))
925 return -EPROTONOSUPPORT;
927 sock->state = SS_UNCONNECTED;
928 sock->ops = &cor_proto_ops;
930 return 0;
933 static struct net_proto_family cor_net_proto_family = {
934 .family = PF_COR,
935 .create = cor_createsock,
936 .owner = THIS_MODULE
939 static int __init cor_sock_init(void)
941 INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace);
942 outofsockbufferspace_scheduled = 0;
944 sock_register(&cor_net_proto_family);
945 sock_bufferusage = 0;
946 return 0;
949 module_init(cor_sock_init);
951 MODULE_LICENSE("GPL");