set credits on new connections
[cor_2_6_31.git] / net / cor / sock.c
blobadb1f0fe6642c0b0eb964d945c1a9c0b244bc2dd
1 /*
2 * Connection oriented routing
3 * Copyright (C) 2007-2010 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18 * 02110-1301, USA.
21 #include <net/sock.h>
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
25 #include "cor.h"
27 /**
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
32 DEFINE_MUTEX(sock_bufferlimits_lock);
33 LIST_HEAD(sock_bt_list);
34 LIST_HEAD(sock_bt_wait_list);
35 static __u64 sock_bufferusage;
37 static struct work_struct outofsockbufferspace_work;
38 static int outofsockbufferspace_scheduled;
40 void free_sbt(struct kref *ref)
42 struct sock_buffertracker *sbt = container_of(ref,
43 struct sock_buffertracker, ref);
45 BUG_ON(sbt->usage != 0);
46 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
48 list_del(&(sbt->lh));
49 kfree(sbt);
52 static struct sock_buffertracker *get_sock_buffertracker(uid_t uid)
54 struct sock_buffertracker *sbt;
55 struct list_head *curr;
57 curr = sock_bt_list.next;
58 while (curr != &sock_bt_list) {
59 sbt = container_of(curr, struct sock_buffertracker, lh);
60 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
61 if (sbt->uid == uid)
62 goto found;
63 curr = curr->next;
66 curr = sock_bt_wait_list.next;
67 while (curr != &sock_bt_wait_list) {
68 sbt = container_of(curr, struct sock_buffertracker, lh);
69 BUG_ON(list_empty(&(sbt->waiting_conns)));
70 if (sbt->uid == uid)
71 goto found;
72 curr = curr->next;
75 sbt = kmalloc(sizeof(struct sock_buffertracker), GFP_KERNEL);
76 if (sbt != 0) {
77 memset(sbt, 0, sizeof(struct sock_buffertracker));
78 sbt->uid = uid;
79 list_add_tail(&(sbt->lh), &sock_bt_list);
80 INIT_LIST_HEAD(&(sbt->delflush_conns));
81 INIT_LIST_HEAD(&(sbt->waiting_conns));
82 kref_init(&(sbt->ref));
85 if (0) {
86 found:
87 kref_get(&(sbt->ref));
89 return sbt;
92 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker *sbt,
93 int waitingconnremoved)
95 if (waitingconnremoved && list_empty(&(sbt->waiting_conns))) {
96 list_del(&(sbt->lh));
97 list_add_tail(&(sbt->lh), &sock_bt_list);
98 return;
101 if (list_empty(&(sbt->waiting_conns)))
102 return;
104 while(sbt->lh.next != &sock_bt_wait_list) {
105 struct sock_buffertracker *next = container_of(sbt->lh.next,
106 struct sock_buffertracker, lh);
108 BUG_ON(sbt->lh.next == &sock_bt_list);
110 if (sbt->usage <= next->usage)
111 break;
113 list_del(&(sbt->lh));
114 list_add(&(sbt->lh), &(next->lh));
118 static int oosbs_resumesbt(struct sock_buffertracker *sbt)
120 int restart = 0;
121 struct list_head *curr = sbt->delflush_conns.next;
123 while (curr != &(sbt->delflush_conns)) {
124 struct conn *rconn = container_of(curr, struct conn,
125 source.sock.delflush_list);
126 int flush = 0;
128 mutex_lock(&(rconn->rcv_lock));
130 BUG_ON(rconn->sourcetype != SOURCE_SOCK);
132 BUG_ON(rconn->source.sock.delay_flush == 0);
134 if (rconn->data_buf.read_remaining != 0) {
135 rconn->source.sock.delay_flush = 0;
136 list_del(&(rconn->source.sock.delflush_list));
137 flush = 1;
140 mutex_unlock(&(rconn->rcv_lock));
142 if (flush) {
143 if (restart == 0) {
144 restart = 1;
145 kref_get(&(sbt->ref));
146 mutex_unlock(&sock_bufferlimits_lock);
148 flush_buf(rconn);
151 curr = curr->next;
154 if (restart)
155 kref_put(&(sbt->ref), free_sbt);
157 return restart;
160 static void oosbs_global(void)
162 struct list_head *curr;
164 if (0) {
165 restart:
166 mutex_lock(&sock_bufferlimits_lock);
169 curr = sock_bt_list.prev;
170 while (curr != &sock_bt_list) {
171 struct sock_buffertracker *sbt = container_of(curr,
172 struct sock_buffertracker, lh);
173 BUG_ON(list_empty(&(sbt->waiting_conns)) == 0);
174 if (oosbs_resumesbt(sbt))
175 goto restart;
176 curr = curr->prev;
179 curr = sock_bt_wait_list.prev;
180 while (curr != &sock_bt_wait_list) {
181 struct sock_buffertracker *sbt = container_of(curr,
182 struct sock_buffertracker, lh);
183 BUG_ON(list_empty(&(sbt->waiting_conns)));
184 if (oosbs_resumesbt(sbt))
185 goto restart;
186 curr = curr->prev;
190 static void oosbs_user(void)
192 struct list_head *curr;
194 if (0) {
195 restart:
196 mutex_lock(&sock_bufferlimits_lock);
199 curr = sock_bt_wait_list.prev;
200 while (curr != &sock_bt_wait_list) {
201 struct sock_buffertracker *sbt = container_of(curr,
202 struct sock_buffertracker, lh);
203 BUG_ON(list_empty(&(sbt->waiting_conns)));
205 if (sbt->usage < (BUFFERLIMIT_SOCK_USER * 3 / 4))
206 break;
208 if (oosbs_resumesbt(sbt))
209 goto restart;
210 curr = curr->prev;
214 static void outofsockbufferspace(struct work_struct *work)
216 mutex_lock(&sock_bufferlimits_lock);
217 if (sock_bufferusage < (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4)) {
218 oosbs_user();
219 if (sock_bufferusage >= (BUFFERLIMIT_SOCK_GLOBAL * 3 / 4))
220 goto global;
221 } else {
222 global:
223 oosbs_global();
225 outofsockbufferspace_scheduled = 0;
226 mutex_unlock(&sock_bufferlimits_lock);
229 static void _reserve_sock_buffer_inswl(struct conn *rconn)
231 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
232 struct list_head *curr;
234 BUG_ON(sbt == 0);
236 if (list_empty(&(sbt->waiting_conns)) == 0)
237 goto wlinserted;
239 list_del(&(sbt->lh));
241 curr = sock_bt_wait_list.next;
242 while (curr != &sock_bt_wait_list) {
243 struct sock_buffertracker *currsbt = container_of(curr,
244 struct sock_buffertracker, lh);
245 BUG_ON(list_empty(&(currsbt->waiting_conns)));
246 if (sbt->usage < currsbt->usage) {
247 list_add(&(sbt->lh), curr);
248 goto wlinserted;
250 curr = curr->next;
253 list_add_tail(&(sbt->lh), &sock_bt_wait_list);
255 wlinserted:
256 curr = sbt->waiting_conns.next;
257 while (curr != &(sbt->waiting_conns)) {
258 struct conn *currrconn = container_of(curr, struct conn,
259 source.sock.alwait_list);
260 BUG_ON(currrconn->sourcetype != SOURCE_SOCK);
261 if (rconn->source.sock.alloclimit <
262 currrconn->source.sock.alloclimit) {
263 list_add(&(rconn->source.sock.alwait_list), curr);
264 goto wcinserted;
266 curr = curr->next;
269 list_add_tail(&(rconn->source.sock.alwait_list), &(sbt->waiting_conns));
271 wcinserted:
272 rconn->source.sock.in_alwait_list = 1;
274 if (outofsockbufferspace_scheduled == 0) {
275 schedule_work(&outofsockbufferspace_work);
276 outofsockbufferspace_scheduled = 1;
280 static void reserve_sock_buffer(struct conn *rconn, __u64 amount)
282 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
283 struct sock_buffertracker *first_wait_sbt = list_empty(
284 &sock_bt_wait_list) ? 0 : container_of(
285 sock_bt_wait_list.next, struct sock_buffertracker, lh);
287 __u32 max = (1 << 30) - 1;
289 BUG_ON(sbt == 0);
291 if (unlikely(amount > max))
292 amount = max;
294 amount += rconn->data_buf.totalsize + rconn->data_buf.overhead -
295 rconn->data_buf.cpacket_buffer;
297 if (unlikely(amount > max))
298 amount = max;
300 if (amount > BUFFERLIMIT_SOCK_SOCK)
301 amount = BUFFERLIMIT_SOCK_SOCK;
303 if (amount <= rconn->source.sock.alloclimit)
304 return;
306 if ((list_empty(&sock_bt_wait_list) == 0 && first_wait_sbt != 0 &&
307 first_wait_sbt != sbt &&
308 first_wait_sbt->usage <= sbt->usage) ||
309 amount - rconn->source.sock.alloclimit >
310 BUFFERLIMIT_SOCK_USER - sbt->usage ||
311 amount - rconn->source.sock.alloclimit >
312 BUFFERLIMIT_SOCK_GLOBAL - sock_bufferusage) {
313 _reserve_sock_buffer_inswl(rconn);
314 } else {
315 int waitingconnremoved = 0;
316 sbt->usage += amount - rconn->source.sock.alloclimit;
317 sock_bufferusage += amount - rconn->source.sock.alloclimit;
318 rconn->source.sock.alloclimit = amount;
320 if (rconn->source.sock.in_alwait_list){
321 list_del(&(rconn->source.sock.alwait_list));
322 rconn->source.sock.in_alwait_list = 0;
323 waitingconnremoved = 1;
325 _reserve_sock_buffer_reord_bt(sbt, waitingconnremoved);
329 static int _resume_bufferwaiting_socks(struct sock_buffertracker *sbt)
331 int failed = 0;
333 while (list_empty(&(sbt->waiting_conns)) && failed == 0) {
334 struct conn *rconn = container_of(sbt->waiting_conns.next,
335 struct conn, source.sock.alwait_list);
336 mutex_lock(&(rconn->rcv_lock));
338 BUG_ON(rconn->sourcetype == SOURCE_SOCK);
339 BUG_ON(rconn->source.sock.in_alwait_list == 0);
340 BUG_ON(rconn->source.sock.wait_len == 0);
342 reserve_sock_buffer(rconn, rconn->source.sock.wait_len);
344 if (rconn->source.sock.alloclimit +
345 rconn->data_buf.cpacket_buffer <=
346 rconn->data_buf.totalsize +
347 rconn->data_buf.overhead) {
348 failed = 1;
349 goto out;
352 wake_up_interruptible(&(rconn->source.sock.wait));
354 out:
355 mutex_unlock(&(rconn->rcv_lock));
358 return failed;
361 static void resume_bufferwaiting_socks(void)
363 struct list_head *curr = sock_bt_wait_list.next;
365 while (curr != &sock_bt_wait_list) {
366 struct sock_buffertracker *currsbt = container_of(curr,
367 struct sock_buffertracker, lh);
368 BUG_ON(list_empty(&(currsbt->waiting_conns)));
369 curr = curr->next;
371 if (_resume_bufferwaiting_socks(currsbt))
372 return;
376 void unreserve_sock_buffer(struct conn *conn)
378 int freed = 0;
379 struct sock_buffertracker *sbt;
381 mutex_lock(&sock_bufferlimits_lock);
382 mutex_lock(&(conn->rcv_lock));
384 if (conn->sourcetype != SOURCE_IN)
385 goto out;
387 sbt = conn->source.sock.sbt;
388 BUG_ON(sbt == 0);
390 if (conn->data_buf.totalsize + conn->data_buf.overhead <=
391 conn->source.sock.alloclimit +
392 conn->data_buf.cpacket_buffer)
393 goto out;
395 freed = 1;
397 BUG_ON(conn->source.sock.alloclimit > sbt->usage);
398 BUG_ON(conn->source.sock.alloclimit > sock_bufferusage);
400 sbt->usage -= conn->source.sock.alloclimit +
401 conn->data_buf.cpacket_buffer-
402 conn->data_buf.totalsize -
403 conn->data_buf.overhead;
405 sock_bufferusage -= conn->source.sock.alloclimit +
406 conn->data_buf.cpacket_buffer -
407 conn->data_buf.totalsize - conn->data_buf.overhead;
409 conn->source.sock.alloclimit = conn->data_buf.totalsize +
410 conn->data_buf.overhead - conn->data_buf.cpacket_buffer;
412 if (conn->source.sock.alloclimit == 0 &&
413 conn->source.sock.in_alwait_list) {
414 list_del(&(conn->source.sock.alwait_list));
415 conn->source.sock.in_alwait_list = 0;
417 if (list_empty(&(sbt->waiting_conns))) {
418 list_del(&(sbt->lh));
419 list_add_tail(&(sbt->lh), &sock_bt_list);
423 if (list_empty(&(sbt->waiting_conns)))
424 goto out;
426 while (sbt->lh.prev != &sock_bt_wait_list) {
427 struct sock_buffertracker *prevsbt = container_of(sbt->lh.prev,
428 struct sock_buffertracker, lh);
430 BUG_ON(sbt->lh.next == &sock_bt_list);
432 if (prevsbt->usage <= sbt->usage)
433 break;
435 list_del(&(sbt->lh));
436 list_add_tail(&(sbt->lh), &(prevsbt->lh));
439 out:
440 mutex_unlock(&(conn->rcv_lock));
442 if (freed)
443 resume_bufferwaiting_socks();
445 mutex_unlock(&sock_bufferlimits_lock);
449 static int check_connlistener_state(struct connlistener *cl)
451 if (likely(cl != 0 && cl->sockstate == SOCKSTATE_LISTENER))
452 return 0;
454 return 1;
457 static int check_conn_state(struct conn *conn)
459 if (likely(conn != 0 && conn->sockstate == SOCKSTATE_CONN))
460 return 0;
462 return 1;
465 int cor_socket_release(struct socket *sock)
467 struct connlistener *cl = (struct connlistener *) sock->sk;
468 struct conn *rconn = (struct conn *) sock->sk;
470 if (sock->sk == 0)
471 return 0;
473 if (cl->sockstate == SOCKSTATE_LISTENER) {
474 close_port(cl);
475 } else if (rconn->sockstate == SOCKSTATE_CONN) {
476 reset_conn(rconn);
477 BUG_ON(rconn->sourcetype != SOURCE_SOCK);
478 kref_put(&(rconn->ref), free_conn);
479 } else {
480 BUG();
483 return 0;
486 int cor_socket_bind(struct socket *sock, struct sockaddr *myaddr,
487 int sockaddr_len)
489 struct connlistener *listener;
490 struct cor_sockaddr *addr = (struct cor_sockaddr *) myaddr;
492 if (unlikely(sock->sk != 0))
493 return -EINVAL;
495 if (sockaddr_len < sizeof(struct cor_sockaddr))
496 return -EINVAL;
498 if (addr->type != SOCKADDRTYPE_PORT)
499 return -EINVAL;
501 listener = open_port(addr->addr.port);
503 if (listener == 0)
504 return -EADDRINUSE;
506 sock->sk = (struct sock *) listener;
508 return 0;
511 int cor_socket_connect(struct socket *sock, struct sockaddr *vaddr,
512 int sockaddr_len, int flags)
514 struct sock_buffertracker *sbt;
516 struct conn *rconn;
518 if (unlikely(sock->sk != 0))
519 return -EISCONN;
521 rconn = alloc_conn(GFP_KERNEL);
523 if (unlikely(rconn == 0))
524 return -ENOMEM;
526 mutex_lock(&sock_bufferlimits_lock);
527 sbt = get_sock_buffertracker(current_uid());
528 mutex_unlock(&sock_bufferlimits_lock);
530 if (unlikely(sbt == 0)) {
532 reset_conn(rconn);
533 return -ENOMEM;
536 kref_get(&(rconn->ref));
538 mutex_lock(&(rconn->rcv_lock));
539 mutex_lock(&(rconn->reversedir->rcv_lock));
540 conn_init_sock_source(rconn);
541 rconn->source.sock.sbt = sbt;
542 conn_init_sock_target(rconn->reversedir);
543 rconn->source.sock.is_client = 1;
544 mutex_unlock(&(rconn->reversedir->rcv_lock));
545 mutex_unlock(&(rconn->rcv_lock));
547 sock->sk = (struct sock *) rconn;
548 sock->state = SS_CONNECTED;
550 return 0;
553 static int cor_rdytoaccept(struct connlistener *cl)
555 int rc;
556 mutex_lock(&(cl->lock));
557 rc = (list_empty(&(cl->conn_queue)) == 0);
558 mutex_unlock(&(cl->lock));
559 return rc;
562 const struct proto_ops cor_proto_ops;
564 int cor_socket_accept(struct socket *sock, struct socket *newsock, int flags)
566 struct sock_buffertracker *sbt;
568 struct connlistener *cl = (struct connlistener *) sock->sk;
570 int rc = check_connlistener_state(cl);
572 struct conn *newconn;
574 if (unlikely(rc))
575 return -EINVAL;
577 mutex_lock(&sock_bufferlimits_lock);
578 sbt = get_sock_buffertracker(current_uid());
579 mutex_unlock(&sock_bufferlimits_lock);
581 if (unlikely(sbt == 0))
582 return -ENOMEM;
584 mutex_lock(&(cl->lock));
586 if (unlikely(cl->queue_maxlen <= 0)) {
587 mutex_unlock(&(cl->lock));
588 return -EINVAL;
591 while (list_empty(&(cl->conn_queue))) {
592 mutex_unlock(&(cl->lock));
593 if (wait_event_interruptible(cl->wait, cor_rdytoaccept(cl))) {
594 kref_put(&(sbt->ref), free_sbt);
595 return -ERESTARTSYS;
597 mutex_lock(&(cl->lock));
600 newconn = container_of(cl->conn_queue.next, struct conn,
601 source.sock.cl_list);
603 BUG_ON(newconn->sourcetype != SOURCE_SOCK);
605 list_del(cl->conn_queue.next);
607 cl->queue_len--;
609 mutex_unlock(&(cl->lock));
611 mutex_lock(&(newconn->rcv_lock));
612 newconn->source.sock.sbt = sbt;
613 mutex_unlock(&(newconn->rcv_lock));
615 newsock->ops = &cor_proto_ops;
616 newsock->sk = (struct sock *) newconn;
617 newsock->state = SS_CONNECTED;
619 return 0;
622 int cor_socket_listen(struct socket *sock, int len)
624 struct connlistener *cl = (struct connlistener *) sock->sk;
626 int rc = check_connlistener_state(cl);
628 if (unlikely(rc))
629 return -EOPNOTSUPP;
631 mutex_lock(&(cl->lock));
632 cl->queue_maxlen = len;
633 mutex_unlock(&(cl->lock));
635 return 0;
638 int cor_socket_shutdown(struct socket *sock, int flags)
640 return -ENOTSUPP;
643 int cor_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
645 return -ENOIOCTLCMD;
648 static int sendmsg_maypush(struct conn *rconn)
650 int ret = 0;
651 mutex_lock(&sock_bufferlimits_lock);
652 mutex_lock(&(rconn->rcv_lock));
653 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
654 ret = 1;
655 } else if (rconn->source.sock.wait_len == 0) {
656 ret = 1;
657 } else if (rconn->source.sock.alloclimit +
658 rconn->data_buf.cpacket_buffer >
659 rconn->data_buf.totalsize +
660 rconn->data_buf.overhead) {
661 ret = 1;
662 } else {
663 reserve_sock_buffer(rconn, rconn->source.sock.wait_len);
664 if (rconn->source.sock.alloclimit +
665 rconn->data_buf.cpacket_buffer >
666 rconn->data_buf.totalsize +
667 rconn->data_buf.overhead)
668 ret = 1;
670 mutex_unlock(&(rconn->rcv_lock));
671 mutex_unlock(&sock_bufferlimits_lock);
672 return ret;
675 int cor_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
676 size_t total_len)
678 __s64 copied = 0;
680 struct conn *rconn = (struct conn *) sock->sk;
682 int rc = check_conn_state(rconn);
684 int flush = (msg->msg_flags & MSG_MORE) == 0;
685 int blocking = (msg->msg_flags & MSG_DONTWAIT) == 0;
687 __s32 bufferfree;
688 __u64 max = (1LL << 32) - 1;
689 __u32 totallen = (total_len > max ? max : total_len);
691 if (unlikely(rc))
692 return -EBADF;
694 recv:
695 mutex_lock(&sock_bufferlimits_lock);
696 mutex_lock(&(rconn->rcv_lock));
698 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
699 mutex_unlock(&sock_bufferlimits_lock);
700 copied = -EPIPE;
701 goto out;
704 reserve_sock_buffer(rconn, totallen);
706 mutex_unlock(&sock_bufferlimits_lock);
708 bufferfree = (__s64) rconn->source.sock.alloclimit +
709 (__s64) rconn->data_buf.cpacket_buffer -
710 (__s64) rconn->data_buf.totalsize -
711 (__s64) rconn->data_buf.overhead;
713 if (bufferfree <= 0) {
714 if (copied == 0)
715 copied = -EAGAIN;
716 goto out;
719 copied = receive_userbuf(rconn, msg, bufferfree, bufferfree >=
720 totallen ? 0 : (rconn->source.sock.alloclimit +
721 rconn->data_buf.cpacket_buffer));
724 if (0) {
725 out:
726 bufferfree = (__s64) rconn->source.sock.alloclimit +
727 (__s64) rconn->data_buf.cpacket_buffer -
728 (__s64) rconn->data_buf.totalsize -
729 (__s64) rconn->data_buf.overhead;
732 if (copied == -EAGAIN)
733 rconn->source.sock.wait_len = totallen;
734 else
735 rconn->source.sock.wait_len = 0;
737 mutex_unlock(&(rconn->rcv_lock));
739 unreserve_sock_buffer(rconn);
741 mutex_lock(&sock_bufferlimits_lock);
742 mutex_lock(&(rconn->rcv_lock));
744 if (flush == 0 && rconn->data_buf.totalsize + rconn->data_buf.overhead -
745 rconn->data_buf.cpacket_buffer <
746 (BUFFERLIMIT_SOCK_SOCK*3)/4) {
747 if (rconn->source.sock.delay_flush == 0) {
748 struct sock_buffertracker *sbt = rconn->source.sock.sbt;
749 list_add_tail(&(rconn->source.sock.delflush_list),
750 &(sbt->delflush_conns));
752 rconn->source.sock.delay_flush = 1;
753 } else {
754 if (rconn->source.sock.delay_flush) {
755 list_del(&(rconn->source.sock.delflush_list));
757 rconn->source.sock.delay_flush = 0;
760 mutex_unlock(&(rconn->rcv_lock));
761 mutex_unlock(&sock_bufferlimits_lock);
763 if (likely(copied > 0 || bufferfree <= 0))
764 flush_buf(rconn);
766 if (copied == -EAGAIN && blocking) {
767 if (wait_event_interruptible(rconn->source.sock.wait,
768 sendmsg_maypush(rconn)) == 0)
769 goto recv;
770 copied = -ERESTARTSYS;
773 BUG_ON(copied > total_len);
774 return copied;
777 static int cor_readytoread(struct conn *sconn)
779 int rc = 0;
780 mutex_lock(&(sconn->rcv_lock));
781 rc = (sconn->data_buf.read_remaining != 0) ||
782 unlikely(atomic_read(&(sconn->isreset)) != 0);
783 mutex_unlock(&(sconn->rcv_lock));
784 return rc;
787 int cor_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
788 size_t total_len, int flags)
790 struct conn *rconn = (struct conn *) sock->sk;
791 struct conn *sconn = rconn->reversedir;
792 size_t copied = 0;
794 int rc = check_conn_state(rconn);
796 int blocking = (flags & MSG_DONTWAIT) == 0;
798 if (unlikely(rc))
799 return -EBADF;
801 BUG_ON(sconn == 0);
803 recv:
804 mutex_lock(&(sconn->rcv_lock));
806 if (unlikely(atomic_read(&(rconn->isreset)) != 0)) {
807 copied = -EPIPE;
808 goto out;
811 copied = databuf_pulluser(sconn, msg);
812 databuf_ackread(sconn);
814 out:
815 mutex_unlock(&(sconn->rcv_lock));
817 if (likely(copied > 0)) {
818 refresh_conn_credits(sconn, 0, 0);
819 unreserve_sock_buffer(sconn);
820 wake_sender(sconn);
824 if (copied == -EAGAIN && blocking) {
825 if (wait_event_interruptible(sconn->target.sock.wait,
826 cor_readytoread(sconn)) == 0)
827 goto recv;
828 copied = -ERESTARTSYS;
831 return copied;
834 const struct proto_ops cor_proto_ops = {
835 .family = PF_COR,
836 .owner = THIS_MODULE,
837 .release = cor_socket_release,
838 .bind = cor_socket_bind,
839 .connect = cor_socket_connect,
840 .accept = cor_socket_accept,
841 .listen = cor_socket_listen,
842 .shutdown = cor_socket_shutdown,
843 .ioctl = cor_ioctl,
844 .sendmsg = cor_sendmsg,
845 .recvmsg = cor_recvmsg
847 /*socketpair
848 getname
849 poll
850 compat_ioctl
851 setsockopt
852 getsockopt
853 compat_setsockopt
854 compat_getsockopt
855 mmap
856 sendpage
857 splice_read*/
860 int cor_createsock(struct net *net, struct socket *sock, int protocol)
862 if (unlikely(protocol != 0))
863 return -EPROTONOSUPPORT;
865 sock->state = SS_UNCONNECTED;
866 sock->ops = &cor_proto_ops;
868 return 0;
871 static struct net_proto_family cor_net_proto_family = {
872 .family = PF_COR,
873 .create = cor_createsock,
874 .owner = THIS_MODULE
877 static int __init cor_sock_init(void)
879 INIT_WORK(&outofsockbufferspace_work, outofsockbufferspace);
880 outofsockbufferspace_scheduled = 0;
882 sock_register(&cor_net_proto_family);
883 sock_bufferusage = 0;
884 return 0;
887 module_init(cor_sock_init);
889 MODULE_LICENSE("GPL");