ping conn removed, conn timeout added, target.out.nb_list removed, conn_list lock...
[cor_2_6_31.git] / net / cor / sock.c
blob5be48d45e063fb754f558204122b5c7ecf0a39b4
1 /**
2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <net/sock.h>
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
25 #include "cor.h"
27 /**
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
31 DEFINE_MUTEX(sock_bufferlimits_lock);
32 LIST_HEAD(sock_bt_list);
33 LIST_HEAD(sock_bt_wait_list);
34 static __u64 sock_bufferusage;
36 static struct work_struct outofsockbufferspace_work;
37 static int outofsockbufferspace_scheduled;
39 void free_sbt(struct kref *ref)
41 struct sock_buffertracker *sbt = container_of(ref,
42 struct sock_buffertracker, ref);
44 BUG_ON(sbt->usage != 0);
45 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
47 list_del(&(sbt->lh));
48 kfree(sbt);
51 static struct sock_buffertracker *get_sock_buffertracker(uid_t uid)
53 struct sock_buffertracker *sbt;
54 struct list_head *curr;
56 curr = sock_bt_list.next;
57 while (curr != &sock_bt_list) {
58 sbt = container_of(curr, struct sock_buffertracker, lh);
59 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
60 if (sbt->uid == uid)
61 goto found;
62 curr = curr->next;
65 curr = sock_bt_wait_list.next;
66 while (curr != &sock_bt_wait_list) {
67 sbt = container_of(curr, struct sock_buffertracker, lh);
68 BUG_ON(list_empty(&(sbt->waiting_conns)));
69 if (sbt->uid == uid)
70 goto found;
71 curr = curr->next;
74 sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_KERNEL);
75 if (sbt != 0) {
76 memset(sbt, 0, sizeof(struct sock_buffertracker));
77 sbt->uid = uid;
78 list_add_tail(&(sbt->lh), &sock_bt_list);
79 INIT_LIST_HEAD(&(sbt->delflush_conns));
80 INIT_LIST_HEAD(&(sbt->waiting_conns));
81 kref_init(&(sbt->ref));
84 if (0) {
85 found:
86 kref_get(&(sbt->ref));
88 return sbt;
91 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker *sbt,
92 int waitingconnremoved)
94 if (waitingconnremoved && list_empty(&(sbt->waiting_conns))) {
95 list_del(&(sbt->lh));
96 list_add_tail(&(sbt->lh), &sock_bt_list);
97 return;
100 if (list_empty(&(sbt->waiting_conns)))
101 return;
103 while(sbt->lh.next != &sock_bt_wait_list) {
104 struct sock_buffertracker *next = container_of(sbt->lh.next,
105 struct sock_buffertracker, lh);
107 BUG_ON(sbt->lh.next == &sock_bt_list);
109 if (sbt->usage <= next->usage)
110 break;
112 list_del(&(sbt->lh));
113 list_add(&(sbt->lh), &(next->lh));
117 static int oosbs_resumesbt(struct sock_buffertracker *sbt)
119 int restart = 0;
120 struct list_head *curr = sbt->delflush_conns.next;
122 while (curr != &(sbt->delflush_conns)) {
123 struct conn *src_in_o = container_of(curr, struct conn,
124 source.sock.delflush_list);
125 int flush = 0;
127 mutex_lock(&(src_in_o->rcv_lock));
129 BUG_ON(src_in_o->sourcetype != SOURCE_SOCK);
131 BUG_ON(src_in_o->source.sock.delay_flush == 0);
133 if (src_in_o->data_buf.read_remaining != 0) {
134 src_in_o->source.sock.delay_flush = 0;
135 list_del(&(src_in_o->source.sock.delflush_list));
136 flush = 1;
139 mutex_unlock(&(src_in_o->rcv_lock));
141 if (flush) {
142 if (restart == 0) {
143 restart = 1;
144 kref_get(&(sbt->ref));
145 mutex_unlock(&sock_bufferlimits_lock);
147 flush_buf(src_in_o);
150 curr = curr->next;
153 if (restart)
154 kref_put(&(sbt->ref), free_sbt);
156 return restart;
159 static void oosbs_global(void)
161 struct list_head *curr;
163 if (0) {
164 restart:
165 mutex_lock(&sock_bufferlimits_lock);
168 curr = sock_bt_list.prev;
169 while (curr != &sock_bt_list) {
170 struct sock_buffertracker *sbt = container_of(curr,
171 struct sock_buffertracker, lh);
172 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
173 if (oosbs_resumesbt(sbt))
174 goto restart;
175 curr = curr->prev;
178 curr = sock_bt_wait_list.prev;
179 while (curr != &sock_bt_wait_list) {
180 struct sock_buffertracker *sbt = container_of(curr,
181 struct sock_buffertracker, lh);
182 BUG_ON(list_empty(&(sbt->waiting_conns)));
183 if (oosbs_resumesbt(sbt))
184 goto restart;
185 curr = curr->prev;
189 static void oosbs_user(void)
191 struct list_head *curr;
193 if (0) {
194 restart:
195 mutex_lock(&sock_bufferlimits_lock);
198 curr = sock_bt_wait_list.prev;
199 while (curr != &sock_bt_wait_list) {
200 struct sock_buffertracker *sbt = container_of(curr,
201 struct sock_buffertracker, lh);
202 BUG_ON(list_empty(&(sbt->waiting_conns)));
204 if (sbt->usage < (BUFFERLIMIT_SOCK_USER * 3 / 4))
205 break;
207 if (oosbs_resumesbt(sbt))
208 goto restart;
209 curr = curr->prev;
213 static void outofsockbufferspace(struct work_struct *work)
215 mutex_lock(&sock_bufferlimits_lock);
216 if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) {
217 oosbs_user();
218 if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4))
219 goto global;
220 } else {
221 global:
222 oosbs_global();
224 outofsockbufferspace_scheduled = 0;
225 mutex_unlock(&sock_bufferlimits_lock);
228 static void _reserve_sock_buffer_inswl(struct conn *src_in_l)
230 struct sock_buffertracker *sbt = src_in_l->source.sock.sbt;
231 struct list_head *curr;
233 BUG_ON(sbt == 0);
235 if (list_empty(&(sbt->waiting_conns)) == 0)
236 goto wlinserted;
238 list_del(&(sbt->lh));
240 curr = sock_bt_wait_list.next;
241 while (curr != &sock_bt_wait_list) {
242 struct sock_buffertracker *currsbt = container_of(curr,
243 struct sock_buffertracker, lh);
244 BUG_ON(list_empty(&(currsbt->waiting_conns)));
245 if (sbt->usage < currsbt->usage) {
246 list_add(&(sbt->lh), curr);
247 goto wlinserted;
249 curr = curr->next;
252 list_add_tail(&(sbt->lh), &sock_bt_wait_list);
254 wlinserted:
255 curr = sbt->waiting_conns.next;
256 while (curr != &(sbt->waiting_conns)) {
257 struct conn *currrconn = container_of(curr, struct conn,
258 source.sock.alwait_list);
259 BUG_ON(currrconn->sourcetype != SOURCE_SOCK);
260 if (src_in_l->source.sock.alloclimit <
261 currrconn->source.sock.alloclimit) {
262 list_add(&(src_in_l->source.sock.alwait_list), curr);
263 goto wcinserted;
265 curr = curr->next;
268 list_add_tail(&(src_in_l->source.sock.alwait_list),
269 &(sbt->waiting_conns));
271 wcinserted:
272 src_in_l->source.sock.in_alwait_list = 1;
274 if (outofsockbufferspace_scheduled == 0) {
275 schedule_work(&outofsockbufferspace_work);
276 outofsockbufferspace_scheduled = 1;
280 static void reserve_sock_buffer(struct conn *src_in_l, __u64 amount)
282 struct sock_buffertracker *sbt = src_in_l->source.sock.sbt;
283 struct sock_buffertracker *first_wait_sbt = list_empty(
284 &sock_bt_wait_list) ? 0 : container_of(
285 sock_bt_wait_list.next, struct sock_buffertracker, lh);
287 __u32 max = (1 << 30) - 1;
289 BUG_ON(sbt == 0);
291 if (unlikely(amount > max))
292 amount = max;
294 amount += src_in_l->data_buf.totalsize + src_in_l->data_buf.overhead -
295 src_in_l->data_buf.cpacket_buffer;
297 if (unlikely(amount > max))
298 amount = max;
300 if (amount > BUFFERLIMIT_SOCK_SOCK)
301 amount = BUFFERLIMIT_SOCK_SOCK;
303 if (amount <= src_in_l->source.sock.alloclimit)
304 return;
306 if ((list_empty(&sock_bt_wait_list) == 0 && first_wait_sbt != 0 &&
307 first_wait_sbt != sbt &&
308 first_wait_sbt->usage <= sbt->usage) ||
309 amount - src_in_l->source.sock.alloclimit >
310 BUFFERLIMIT_SOCK_USER - sbt->usage ||
311 amount - src_in_l->source.sock.alloclimit >
312 BUFFERLIMIT_SOCK_GLOBAL - sock_bufferusage) {
313 _reserve_sock_buffer_inswl(src_in_l);
314 } else {
315 int waitingconnremoved = 0;
316 sbt->usage += amount - src_in_l->source.sock.alloclimit;
317 sock_bufferusage += amount - src_in_l->source.sock.alloclimit;
318 src_in_l->source.sock.alloclimit = amount;
320 if (src_in_l->source.sock.in_alwait_list){
321 list_del(&(src_in_l->source.sock.alwait_list));
322 src_in_l->source.sock.in_alwait_list = 0;
323 waitingconnremoved = 1;
325 _reserve_sock_buffer_reord_bt(sbt, waitingconnremoved);
329 static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt)
331 int failed = 0;
333 while (list_empty(&(sbt->waiting_conns)) && failed == 0) {
334 struct conn *src_in_o = container_of(sbt->waiting_conns.next,
335 struct conn, source.sock.alwait_list);
336 mutex_lock(&(src_in_o->rcv_lock));
338 BUG_ON(src_in_o->sourcetype == SOURCE_SOCK);
339 BUG_ON(src_in_o->source.sock.in_alwait_list == 0);
340 BUG_ON(src_in_o->source.sock.wait_len == 0);
342 reserve_sock_buffer(src_in_o, src_in_o->source.sock.wait_len);
344 if (src_in_o->source.sock.alloclimit +
345 src_in_o->data_buf.cpacket_buffer <=
346 src_in_o->data_buf.totalsize +
347 src_in_o->data_buf.overhead) {
348 failed = 1;
349 goto out;
352 wake_up_interruptible(&(src_in_o->source.sock.wait));
354 out:
355 mutex_unlock(&(src_in_o->rcv_lock));
358 return failed;
361 static void resume_bufferwaiting_socks(void)
363 struct list_head *curr = sock_bt_wait_list.next;
365 while (curr != &sock_bt_wait_list) {
366 struct sock_buffertracker *currsbt = container_of(curr,
367 struct sock_buffertracker, lh);
368 BUG_ON(list_empty(&(currsbt->waiting_conns)));
369 curr = curr->next;
371 if (_resume_bufferwaiting_socks(currsbt))
372 return;
376 void unreserve_sock_buffer(struct conn *cn)
378 int freed = 0;
379 struct sock_buffertracker *sbt;
381 mutex_lock(&sock_bufferlimits_lock);
382 mutex_lock(&(cn->rcv_lock));
384 if (cn->sourcetype != SOURCE_SOCK)
385 goto out;
387 sbt = cn->source.sock.sbt;
388 BUG_ON(sbt == 0);
390 if (cn->data_buf.totalsize + cn->data_buf.overhead <=
391 cn->source.sock.alloclimit +
392 cn->data_buf.cpacket_buffer)
393 goto out;
395 freed = 1;
397 BUG_ON(cn->source.sock.alloclimit > sbt->usage);
398 BUG_ON(cn->source.sock.alloclimit > sock_bufferusage);
399 BUG_ON(cn->data_buf.cpacket_buffer > cn->data_buf.totalsize +
400 cn->data_buf.overhead);
402 sbt->usage -= cn->source.sock.alloclimit;
403 sbt->usage += cn->data_buf.totalsize;
404 sbt->usage += cn->data_buf.overhead;
405 sbt->usage -= cn->data_buf.cpacket_buffer;
407 sock_bufferusage -= cn->source.sock.alloclimit;
408 sock_bufferusage += cn->data_buf.totalsize;
409 sock_bufferusage += cn->data_buf.overhead;
410 sock_bufferusage -= cn->data_buf.cpacket_buffer;
412 cn->source.sock.alloclimit = cn->data_buf.totalsize +
413 cn->data_buf.overhead - cn->data_buf.cpacket_buffer;
415 if (cn->source.sock.alloclimit == 0 &&
416 cn->source.sock.in_alwait_list) {
417 list_del(&(cn->source.sock.alwait_list));
418 cn->source.sock.in_alwait_list = 0;
420 if (list_empty(&(sbt->waiting_conns))) {
421 list_del(&(sbt->lh));
422 list_add_tail(&(sbt->lh), &sock_bt_list);
426 if (list_empty(&(sbt->waiting_conns)))
427 goto out;
429 while (sbt->lh.prev != &sock_bt_wait_list) {
430 struct sock_buffertracker *prevsbt = container_of(sbt->lh.prev,
431 struct sock_buffertracker, lh);
433 BUG_ON(sbt->lh.next == &sock_bt_list);
435 if (prevsbt->usage <= sbt->usage)
436 break;
438 list_del(&(sbt->lh));
439 list_add_tail(&(sbt->lh), &(prevsbt->lh));
442 out:
443 mutex_unlock(&(cn->rcv_lock));
445 if (freed)
446 resume_bufferwaiting_socks();
448 mutex_unlock(&sock_bufferlimits_lock);
452 static int check_connlistener_state(struct connlistener *cl)
454 if (likely(cl != 0 && cl->sockstate == SOCKSTATE_LISTENER))
455 return 0;
457 return 1;
460 static int check_conn_state(struct conn *cn)
462 if (likely(cn != 0 && cn->sockstate == SOCKSTATE_CONN))
463 return 0;
465 return 1;
468 int cor_socket_release(struct socket *sock)
470 struct connlistener *cl = (struct connlistener *) sock->sk;
471 struct conn *src_in_o = (struct conn *) sock->sk;
473 if (sock->sk == 0)
474 return 0;
476 if (cl->sockstate == SOCKSTATE_LISTENER) {
477 close_port(cl);
478 } else if (src_in_o->sockstate == SOCKSTATE_CONN) {
479 reset_conn(src_in_o);
480 BUG_ON(src_in_o->sourcetype != SOURCE_SOCK);
481 kref_put(&(src_in_o->ref), free_conn);
482 } else {
483 BUG();
486 return 0;
489 int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr,
490 int sockaddr_len)
492 struct connlistener *listener;
493 struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr;
495 if (unlikely(sock->sk != 0))
496 return -EINVAL;
498 if (sockaddr_len < sizeof(struct cor_sockaddr))
499 return -EINVAL;
501 if (addr->type != SOCKADDRTYPE_PORT)
502 return -EINVAL;
504 listener = open_port(addr->addr.port);
506 if (listener == 0)
507 return -EADDRINUSE;
509 sock->sk = (struct sock *) listener;
511 return 0;
514 int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr,
515 int sockaddr_len, int flags)
517 struct sock_buffertracker *sbt;
519 struct conn *src_sock;
521 if (unlikely(sock->sk != 0))
522 return -EISCONN;
524 src_sock = alloc_conn(GFP_KERNEL);
526 if (unlikely(src_sock == 0))
527 return -ENOMEM;
529 src_sock->is_client = 1;
531 mutex_lock(&sock_bufferlimits_lock);
532 sbt = get_sock_buffertracker(current_uid());
533 mutex_unlock(&sock_bufferlimits_lock);
535 if (unlikely(sbt == 0)) {
536 reset_conn(src_sock);
537 return -ENOMEM;
540 kref_get(&(src_sock->ref));
542 mutex_lock(&(src_sock->rcv_lock));
543 mutex_lock(&(src_sock->reversedir->rcv_lock));
544 conn_init_sock_source(src_sock);
545 src_sock->source.sock.sbt = sbt;
546 conn_init_sock_target(src_sock->reversedir);
547 mutex_unlock(&(src_sock->reversedir->rcv_lock));
548 mutex_unlock(&(src_sock->rcv_lock));
550 sock->sk = (struct sock *) src_sock;
551 sock->state = SS_CONNECTED;
553 return 0;
556 static int cor_rdytoaccept(struct connlistener *cl)
558 int rc;
559 mutex_lock(&(cl->lock));
560 rc = (list_empty(&(cl->conn_queue)) == 0);
561 mutex_unlock(&(cl->lock));
562 return rc;
565 const struct proto_ops cor_proto_ops;
567 int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags)
569 struct sock_buffertracker *sbt;
571 struct connlistener *cl = (struct connlistener *) sock->sk;
573 int rc = check_connlistener_state(cl);
575 struct conn *src_sock_o;
577 if (unlikely(rc))
578 return -EINVAL;
580 mutex_lock(&sock_bufferlimits_lock);
581 sbt = get_sock_buffertracker(current_uid());
582 mutex_unlock(&sock_bufferlimits_lock);
584 if (unlikely(sbt == 0))
585 return -ENOMEM;
587 mutex_lock(&(cl->lock));
589 if (unlikely(cl->queue_maxlen <= 0)) {
590 mutex_unlock(&(cl->lock));
591 return -EINVAL;
594 while (list_empty(&(cl->conn_queue))) {
595 mutex_unlock(&(cl->lock));
596 if (wait_event_interruptible(cl->wait, cor_rdytoaccept(cl))) {
597 kref_put(&(sbt->ref), free_sbt);
598 return -ERESTARTSYS;
600 mutex_lock(&(cl->lock));
603 src_sock_o = container_of(cl->conn_queue.next, struct conn,
604 source.sock.cl_list);
606 BUG_ON(src_sock_o->sourcetype != SOURCE_SOCK);
608 list_del(cl->conn_queue.next);
610 cl->queue_len--;
612 mutex_unlock(&(cl->lock));
614 mutex_lock(&(src_sock_o->rcv_lock));
615 src_sock_o->source.sock.sbt = sbt;
616 mutex_unlock(&(src_sock_o->rcv_lock));
618 newsock->ops = &cor_proto_ops;
619 newsock->sk = (struct sock *) src_sock_o;
620 newsock->state = SS_CONNECTED;
622 return 0;
625 int cor_socket_listen(struct socket *sock, int len)
627 struct connlistener *cl = (struct connlistener *) sock->sk;
629 int rc = check_connlistener_state(cl);
631 if (unlikely(rc))
632 return -EOPNOTSUPP;
634 mutex_lock(&(cl->lock));
635 cl->queue_maxlen = len;
636 mutex_unlock(&(cl->lock));
638 return 0;
641 int cor_socket_shutdown(struct socket *sock, int flags)
643 return -ENOTSUPP;
646 int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
648 return -ENOIOCTLCMD;
651 static int sendmsg_maypush(struct conn *src_sock_o)
653 int ret = 0;
654 mutex_lock(&sock_bufferlimits_lock);
655 mutex_lock(&(src_sock_o->rcv_lock));
656 if (unlikely(atomic_read(&(src_sock_o->isreset)) != 0)) {
657 ret = 1;
658 } else if (src_sock_o->source.sock.wait_len == 0) {
659 ret = 1;
660 } else if (src_sock_o->source.sock.alloclimit +
661 src_sock_o->data_buf.cpacket_buffer >
662 src_sock_o->data_buf.totalsize +
663 src_sock_o->data_buf.overhead) {
664 ret = 1;
665 } else {
666 reserve_sock_buffer(src_sock_o,
667 src_sock_o->source.sock.wait_len);
668 if (src_sock_o->source.sock.alloclimit +
669 src_sock_o->data_buf.cpacket_buffer >
670 src_sock_o->data_buf.totalsize +
671 src_sock_o->data_buf.overhead)
672 ret = 1;
674 mutex_unlock(&(src_sock_o->rcv_lock));
675 mutex_unlock(&sock_bufferlimits_lock);
676 return ret;
679 int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
680 size_t total_len)
682 __s64 copied = 0;
684 struct conn *src_sock_o = (struct conn *) sock->sk;
686 int rc = check_conn_state(src_sock_o);
688 int flush = (msg->msg_flags & MSG_MORE) == 0;
689 int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0;
691 __s32 bufferfree;
692 __u64 max = (1LL << 32) - 1;
693 __u32 totallen = (total_len > max ? max : total_len);
695 if (unlikely(rc))
696 return -EBADF;
698 recv:
699 mutex_lock(&sock_bufferlimits_lock);
700 mutex_lock(&(src_sock_o->rcv_lock));
702 if (unlikely(atomic_read(&(src_sock_o->isreset)) != 0)) {
703 mutex_unlock(&sock_bufferlimits_lock);
704 copied = -EPIPE;
705 goto out;
708 reserve_sock_buffer(src_sock_o, totallen);
710 bufferfree = (__s64) src_sock_o->source.sock.alloclimit +
711 (__s64) src_sock_o->data_buf.cpacket_buffer -
712 (__s64) src_sock_o->data_buf.totalsize -
713 (__s64) src_sock_o->data_buf.overhead;
715 mutex_unlock(&sock_bufferlimits_lock);
717 if (bufferfree <= 0) {
718 if (copied == 0)
719 copied = -EAGAIN;
720 goto out;
723 copied = receive_userbuf(src_sock_o, msg, bufferfree, bufferfree >=
724 totallen ? 0 : (src_sock_o->source.sock.alloclimit +
725 src_sock_o->data_buf.cpacket_buffer));
728 if (0) {
729 out:
730 bufferfree = (__s64) src_sock_o->source.sock.alloclimit +
731 (__s64) src_sock_o->data_buf.cpacket_buffer -
732 (__s64) src_sock_o->data_buf.totalsize -
733 (__s64) src_sock_o->data_buf.overhead;
736 if (copied == -EAGAIN)
737 src_sock_o->source.sock.wait_len = totallen;
738 else
739 src_sock_o->source.sock.wait_len = 0;
741 mutex_unlock(&(src_sock_o->rcv_lock));
743 unreserve_sock_buffer(src_sock_o);
745 mutex_lock(&sock_bufferlimits_lock);
746 mutex_lock(&(src_sock_o->rcv_lock));
748 if (flush == 0 && src_sock_o->data_buf.totalsize +
749 src_sock_o->data_buf.overhead -
750 src_sock_o->data_buf.cpacket_buffer <
751 (BUFFERLIMIT_SOCK_SOCK*3)/4) {
752 if (src_sock_o->source.sock.delay_flush == 0) {
753 struct sock_buffertracker *sbt =
754 src_sock_o->source.sock.sbt;
755 list_add_tail(&(src_sock_o->source.sock.delflush_list),
756 &(sbt->delflush_conns));
758 src_sock_o->source.sock.delay_flush = 1;
759 } else {
760 if (src_sock_o->source.sock.delay_flush) {
761 list_del(&(src_sock_o->source.sock.delflush_list));
763 src_sock_o->source.sock.delay_flush = 0;
766 mutex_unlock(&(src_sock_o->rcv_lock));
767 mutex_unlock(&sock_bufferlimits_lock);
769 if (likely(copied > 0 || bufferfree <= 0))
770 flush_buf(src_sock_o);
772 if (copied == -EAGAIN && blocking) {
773 if (wait_event_interruptible(src_sock_o->source.sock.wait,
774 sendmsg_maypush(src_sock_o)) == 0)
775 goto recv;
776 copied = -ERESTARTSYS;
779 BUG_ON(copied > total_len);
780 return copied;
783 static int cor_readytoread(struct conn *trgt_sock_o)
785 int rc = 0;
786 mutex_lock(&(trgt_sock_o->rcv_lock));
787 rc = (trgt_sock_o->data_buf.read_remaining != 0) ||
788 unlikely(atomic_read(&(trgt_sock_o->isreset)) != 0);
789 mutex_unlock(&(trgt_sock_o->rcv_lock));
790 return rc;
793 int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
794 size_t total_len, int flags)
796 struct conn *src_sock_o = (struct conn *) sock->sk;
797 struct conn *trgt_sock_o;
798 size_t copied = 0;
800 int rc = check_conn_state(src_sock_o);
802 int blocking = (flags & MSG_DONTWAIT) == 0;
804 if (unlikely(rc))
805 return -EBADF;
807 trgt_sock_o = src_sock_o->reversedir;
809 BUG_ON(trgt_sock_o == 0);
811 recv:
812 mutex_lock(&(trgt_sock_o->rcv_lock));
814 if (unlikely(atomic_read(&(trgt_sock_o->isreset)) != 0)) {
815 copied = -EPIPE;
816 goto out;
819 copied = databuf_pulluser(trgt_sock_o, msg);
820 databuf_ackread(trgt_sock_o);
822 out:
823 mutex_unlock(&(trgt_sock_o->rcv_lock));
825 if (likely(copied > 0)) {
826 refresh_conn_credits(trgt_sock_o, 0, 0);
827 #warning todo move unreserve calls to wake_sender
828 unreserve_sock_buffer(trgt_sock_o);
829 wake_sender(trgt_sock_o);
833 if (copied == -EAGAIN && blocking) {
834 if (wait_event_interruptible(trgt_sock_o->target.sock.wait,
835 cor_readytoread(trgt_sock_o)) == 0)
836 goto recv;
837 copied = -ERESTARTSYS;
840 return copied;
843 const struct proto_ops cor_proto_ops = {
844 .family = PF_COR,
845 .owner = THIS_MODULE,
846 .release = cor_socket_release,
847 .bind = cor_socket_bind,
848 .connect = cor_socket_connect,
849 .accept = cor_socket_accept,
850 .listen = cor_socket_listen,
851 .shutdown = cor_socket_shutdown,
852 .ioctl = cor_ioctl,
853 .sendmsg = cor_sendmsg,
854 .recvmsg = cor_recvmsg
856 /*socketpair
857 getname
858 poll
859 compat_ioctl
860 setsockopt
861 getsockopt
862 compat_setsockopt
863 compat_getsockopt
864 mmap
865 sendpage
866 splice_read*/
869 int cor_createsock(struct net *net, struct socket *sock, int protocol)
871 if (unlikely(protocol != 0))
872 return -EPROTONOSUPPORT;
874 sock->state = SS_UNCONNECTED;
875 sock->ops = &cor_proto_ops;
877 return 0;
880 static struct net_proto_family cor_net_proto_family = {
881 .family = PF_COR,
882 .create = cor_createsock,
883 .owner = THIS_MODULE
886 static int __init cor_sock_init(void)
888 INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace);
889 outofsockbufferspace_scheduled = 0;
891 sock_register(&cor_net_proto_family);
892 sock_bufferusage = 0;
893 return 0;
896 module_init(cor_sock_init);
898 MODULE_LICENSE("GPL");