2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
31 DEFINE_MUTEX(sock_bufferlimits_lock
);
32 LIST_HEAD(sock_bt_list
);
33 LIST_HEAD(sock_bt_wait_list
);
34 static __u64 sock_bufferusage
;
36 static struct work_struct outofsockbufferspace_work
;
37 static int outofsockbufferspace_scheduled
;
39 static void free_sbt(struct kref
*ref
)
41 struct sock_buffertracker
*sbt
= container_of(ref
,
42 struct sock_buffertracker
, ref
);
44 BUG_ON(sbt
->usage
!= 0);
45 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
51 static struct sock_buffertracker
*get_sock_buffertracker(uid_t uid
)
53 struct sock_buffertracker
*sbt
;
54 struct list_head
*curr
;
56 curr
= sock_bt_list
.next
;
57 while (curr
!= &sock_bt_list
) {
58 sbt
= container_of(curr
, struct sock_buffertracker
, lh
);
59 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
65 curr
= sock_bt_wait_list
.next
;
66 while (curr
!= &sock_bt_wait_list
) {
67 sbt
= container_of(curr
, struct sock_buffertracker
, lh
);
68 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
74 sbt
= kmalloc(sizeof(struct sock_buffertracker
), GFP_KERNEL
);
76 memset(sbt
, 0, sizeof(struct sock_buffertracker
));
78 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
79 INIT_LIST_HEAD(&(sbt
->delflush_conns
));
80 INIT_LIST_HEAD(&(sbt
->waiting_conns
));
81 kref_init(&(sbt
->ref
));
86 kref_get(&(sbt
->ref
));
91 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker
*sbt
,
92 int waitingconnremoved
)
94 if (waitingconnremoved
&& list_empty(&(sbt
->waiting_conns
))) {
96 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
100 if (list_empty(&(sbt
->waiting_conns
)))
103 while(sbt
->lh
.next
!= &sock_bt_wait_list
) {
104 struct sock_buffertracker
*next
= container_of(sbt
->lh
.next
,
105 struct sock_buffertracker
, lh
);
107 BUG_ON(sbt
->lh
.next
== &sock_bt_list
);
109 if (sbt
->usage
<= next
->usage
)
112 list_del(&(sbt
->lh
));
113 list_add(&(sbt
->lh
), &(next
->lh
));
117 static int oosbs_resumesbt(struct sock_buffertracker
*sbt
)
120 struct list_head
*curr
= sbt
->delflush_conns
.next
;
122 while (curr
!= &(sbt
->delflush_conns
)) {
123 struct conn
*src_in_o
= container_of(curr
, struct conn
,
124 source
.sock
.delflush_list
);
127 mutex_lock(&(src_in_o
->rcv_lock
));
129 BUG_ON(src_in_o
->sourcetype
!= SOURCE_SOCK
);
131 BUG_ON(src_in_o
->source
.sock
.delay_flush
== 0);
133 if (src_in_o
->data_buf
.read_remaining
!= 0) {
134 src_in_o
->source
.sock
.delay_flush
= 0;
135 list_del(&(src_in_o
->source
.sock
.delflush_list
));
139 mutex_unlock(&(src_in_o
->rcv_lock
));
144 kref_get(&(sbt
->ref
));
145 mutex_unlock(&sock_bufferlimits_lock
);
154 kref_put(&(sbt
->ref
), free_sbt
);
159 static void oosbs_global(void)
161 struct list_head
*curr
;
165 mutex_lock(&sock_bufferlimits_lock
);
168 curr
= sock_bt_list
.prev
;
169 while (curr
!= &sock_bt_list
) {
170 struct sock_buffertracker
*sbt
= container_of(curr
,
171 struct sock_buffertracker
, lh
);
172 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
173 if (oosbs_resumesbt(sbt
))
178 curr
= sock_bt_wait_list
.prev
;
179 while (curr
!= &sock_bt_wait_list
) {
180 struct sock_buffertracker
*sbt
= container_of(curr
,
181 struct sock_buffertracker
, lh
);
182 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
183 if (oosbs_resumesbt(sbt
))
189 static void oosbs_user(void)
191 struct list_head
*curr
;
195 mutex_lock(&sock_bufferlimits_lock
);
198 curr
= sock_bt_wait_list
.prev
;
199 while (curr
!= &sock_bt_wait_list
) {
200 struct sock_buffertracker
*sbt
= container_of(curr
,
201 struct sock_buffertracker
, lh
);
202 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
204 if (sbt
->usage
< (BUFFERLIMIT_SOCK_USER
* 3 / 4))
207 if (oosbs_resumesbt(sbt
))
213 static void outofsockbufferspace(struct work_struct
*work
)
215 mutex_lock(&sock_bufferlimits_lock
);
216 if (sock_bufferusage
< (BUFFERLIMIT_SOCK_GLOBAL
* 3 / 4)) {
218 if (sock_bufferusage
>= (BUFFERLIMIT_SOCK_GLOBAL
* 3 / 4))
224 outofsockbufferspace_scheduled
= 0;
225 mutex_unlock(&sock_bufferlimits_lock
);
228 static void _reserve_sock_buffer_inswl(struct conn
*src_in_l
)
230 struct sock_buffertracker
*sbt
= src_in_l
->source
.sock
.sbt
;
231 struct list_head
*curr
;
235 if (list_empty(&(sbt
->waiting_conns
)) == 0)
238 list_del(&(sbt
->lh
));
240 curr
= sock_bt_wait_list
.next
;
241 while (curr
!= &sock_bt_wait_list
) {
242 struct sock_buffertracker
*currsbt
= container_of(curr
,
243 struct sock_buffertracker
, lh
);
244 BUG_ON(list_empty(&(currsbt
->waiting_conns
)));
245 if (sbt
->usage
< currsbt
->usage
) {
246 list_add(&(sbt
->lh
), curr
);
252 list_add_tail(&(sbt
->lh
), &sock_bt_wait_list
);
255 curr
= sbt
->waiting_conns
.next
;
256 while (curr
!= &(sbt
->waiting_conns
)) {
257 struct conn
*currrconn
= container_of(curr
, struct conn
,
258 source
.sock
.alwait_list
);
259 BUG_ON(currrconn
->sourcetype
!= SOURCE_SOCK
);
260 if (src_in_l
->source
.sock
.alloclimit
<
261 currrconn
->source
.sock
.alloclimit
) {
262 list_add(&(src_in_l
->source
.sock
.alwait_list
), curr
);
268 list_add_tail(&(src_in_l
->source
.sock
.alwait_list
),
269 &(sbt
->waiting_conns
));
272 src_in_l
->source
.sock
.in_alwait_list
= 1;
274 if (outofsockbufferspace_scheduled
== 0) {
275 schedule_work(&outofsockbufferspace_work
);
276 outofsockbufferspace_scheduled
= 1;
280 static void reserve_sock_buffer(struct conn
*src_in_l
, __u64 amount
)
282 struct sock_buffertracker
*sbt
= src_in_l
->source
.sock
.sbt
;
283 struct sock_buffertracker
*first_wait_sbt
= list_empty(
284 &sock_bt_wait_list
) ? 0 : container_of(
285 sock_bt_wait_list
.next
, struct sock_buffertracker
, lh
);
287 __u32 max
= (1 << 30) - 1;
291 if (unlikely(amount
> max
))
294 amount
+= src_in_l
->data_buf
.totalsize
+ src_in_l
->data_buf
.overhead
-
295 src_in_l
->data_buf
.cpacket_buffer
;
297 if (unlikely(amount
> max
))
300 if (amount
> BUFFERLIMIT_SOCK_SOCK
)
301 amount
= BUFFERLIMIT_SOCK_SOCK
;
303 if (amount
<= src_in_l
->source
.sock
.alloclimit
)
306 if ((list_empty(&sock_bt_wait_list
) == 0 && first_wait_sbt
!= 0 &&
307 first_wait_sbt
!= sbt
&&
308 first_wait_sbt
->usage
<= sbt
->usage
) ||
309 amount
- src_in_l
->source
.sock
.alloclimit
>
310 BUFFERLIMIT_SOCK_USER
- sbt
->usage
||
311 amount
- src_in_l
->source
.sock
.alloclimit
>
312 BUFFERLIMIT_SOCK_GLOBAL
- sock_bufferusage
) {
313 _reserve_sock_buffer_inswl(src_in_l
);
315 int waitingconnremoved
= 0;
316 sbt
->usage
+= amount
- src_in_l
->source
.sock
.alloclimit
;
317 sock_bufferusage
+= amount
- src_in_l
->source
.sock
.alloclimit
;
318 src_in_l
->source
.sock
.alloclimit
= amount
;
320 if (src_in_l
->source
.sock
.in_alwait_list
){
321 list_del(&(src_in_l
->source
.sock
.alwait_list
));
322 src_in_l
->source
.sock
.in_alwait_list
= 0;
323 waitingconnremoved
= 1;
325 _reserve_sock_buffer_reord_bt(sbt
, waitingconnremoved
);
329 static int _resume_bufferwaiting_socks(struct sock_buffertracker
*sbt
)
333 while (list_empty(&(sbt
->waiting_conns
)) && failed
== 0) {
334 struct conn
*src_in_o
= container_of(sbt
->waiting_conns
.next
,
335 struct conn
, source
.sock
.alwait_list
);
336 mutex_lock(&(src_in_o
->rcv_lock
));
338 BUG_ON(src_in_o
->sourcetype
== SOURCE_SOCK
);
339 BUG_ON(src_in_o
->source
.sock
.in_alwait_list
== 0);
340 BUG_ON(src_in_o
->source
.sock
.wait_len
== 0);
342 reserve_sock_buffer(src_in_o
, src_in_o
->source
.sock
.wait_len
);
344 if (src_in_o
->source
.sock
.alloclimit
+
345 src_in_o
->data_buf
.cpacket_buffer
<=
346 src_in_o
->data_buf
.totalsize
+
347 src_in_o
->data_buf
.overhead
) {
352 wake_up_interruptible(&(src_in_o
->source
.sock
.wait
));
355 mutex_unlock(&(src_in_o
->rcv_lock
));
361 static void resume_bufferwaiting_socks(void)
363 struct list_head
*curr
= sock_bt_wait_list
.next
;
365 while (curr
!= &sock_bt_wait_list
) {
366 struct sock_buffertracker
*currsbt
= container_of(curr
,
367 struct sock_buffertracker
, lh
);
368 BUG_ON(list_empty(&(currsbt
->waiting_conns
)));
371 if (_resume_bufferwaiting_socks(currsbt
))
376 static void reorder_sock_bt_wait_list(struct sock_buffertracker
*sbt
)
378 if (list_empty(&(sbt
->waiting_conns
)))
381 while (sbt
->lh
.prev
!= &sock_bt_wait_list
) {
382 struct sock_buffertracker
*prevsbt
= container_of(sbt
->lh
.prev
,
383 struct sock_buffertracker
, lh
);
385 BUG_ON(sbt
->lh
.next
== &sock_bt_list
);
387 if (prevsbt
->usage
<= sbt
->usage
)
390 list_del(&(sbt
->lh
));
391 list_add_tail(&(sbt
->lh
), &(prevsbt
->lh
));
395 void connreset_sbt(struct conn
*cn
)
397 struct sock_buffertracker
*sbt
;
399 mutex_lock(&sock_bufferlimits_lock
);
400 mutex_lock(&(cn
->rcv_lock
));
402 if (cn
->sourcetype
!= SOURCE_SOCK
)
405 sbt
= cn
->source
.sock
.sbt
;
408 if (cn
->source
.sock
.in_alwait_list
) {
409 list_del(&(cn
->source
.sock
.alwait_list
));
410 cn
->source
.sock
.in_alwait_list
= 0;
412 if (list_empty(&(sbt
->waiting_conns
))) {
413 list_del(&(sbt
->lh
));
414 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
417 reorder_sock_bt_wait_list(sbt
);
420 sbt
->usage
-= cn
->source
.sock
.alloclimit
;
421 if (cn
->source
.sock
.delay_flush
) {
422 cn
->source
.sock
.delay_flush
= 0;
423 list_del(&(cn
->source
.sock
.delflush_list
));
425 kref_put(&(sbt
->ref
), free_sbt
);
426 cn
->source
.sock
.sbt
= 0;
429 mutex_unlock(&(cn
->rcv_lock
));
430 mutex_unlock(&sock_bufferlimits_lock
);
433 void unreserve_sock_buffer(struct conn
*cn
)
436 struct sock_buffertracker
*sbt
;
438 mutex_lock(&sock_bufferlimits_lock
);
439 mutex_lock(&(cn
->rcv_lock
));
441 if (cn
->sourcetype
!= SOURCE_SOCK
)
444 if (unlikely(atomic_read(&(cn
->isreset
)) != 0))
447 sbt
= cn
->source
.sock
.sbt
;
450 if (cn
->data_buf
.totalsize
+ cn
->data_buf
.overhead
<=
451 cn
->source
.sock
.alloclimit
+
452 cn
->data_buf
.cpacket_buffer
)
457 BUG_ON(cn
->source
.sock
.alloclimit
> sbt
->usage
);
458 BUG_ON(cn
->source
.sock
.alloclimit
> sock_bufferusage
);
459 BUG_ON(cn
->data_buf
.cpacket_buffer
> cn
->data_buf
.totalsize
+
460 cn
->data_buf
.overhead
);
462 sbt
->usage
-= cn
->source
.sock
.alloclimit
;
463 sbt
->usage
+= cn
->data_buf
.totalsize
;
464 sbt
->usage
+= cn
->data_buf
.overhead
;
465 sbt
->usage
-= cn
->data_buf
.cpacket_buffer
;
467 sock_bufferusage
-= cn
->source
.sock
.alloclimit
;
468 sock_bufferusage
+= cn
->data_buf
.totalsize
;
469 sock_bufferusage
+= cn
->data_buf
.overhead
;
470 sock_bufferusage
-= cn
->data_buf
.cpacket_buffer
;
472 cn
->source
.sock
.alloclimit
= cn
->data_buf
.totalsize
+
473 cn
->data_buf
.overhead
- cn
->data_buf
.cpacket_buffer
;
475 if (cn
->source
.sock
.alloclimit
== 0 &&
476 cn
->source
.sock
.in_alwait_list
) {
477 list_del(&(cn
->source
.sock
.alwait_list
));
478 cn
->source
.sock
.in_alwait_list
= 0;
480 if (list_empty(&(sbt
->waiting_conns
))) {
481 list_del(&(sbt
->lh
));
482 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
486 reorder_sock_bt_wait_list(sbt
);
489 mutex_unlock(&(cn
->rcv_lock
));
492 resume_bufferwaiting_socks();
494 mutex_unlock(&sock_bufferlimits_lock
);
498 static int check_connlistener_state(struct connlistener
*cl
)
500 if (likely(cl
!= 0 && cl
->sockstate
== SOCKSTATE_LISTENER
))
506 static int check_conn_state(struct conn
*cn
)
508 if (likely(cn
!= 0 && cn
->sockstate
== SOCKSTATE_CONN
))
514 int cor_socket_release(struct socket
*sock
)
516 struct connlistener
*cl
= (struct connlistener
*) sock
->sk
;
517 struct conn
*src_in_o
= (struct conn
*) sock
->sk
;
522 if (cl
->sockstate
== SOCKSTATE_LISTENER
) {
524 } else if (src_in_o
->sockstate
== SOCKSTATE_CONN
) {
525 reset_conn(src_in_o
);
526 BUG_ON(src_in_o
->sourcetype
!= SOURCE_SOCK
);
527 kref_put(&(src_in_o
->ref
), free_conn
);
535 int cor_socket_bind(struct socket
*sock
, struct sockaddr
*myaddr
,
538 struct connlistener
*listener
;
539 struct cor_sockaddr
*addr
= (struct cor_sockaddr
*) myaddr
;
541 if (unlikely(sock
->sk
!= 0))
544 if (sockaddr_len
< sizeof(struct cor_sockaddr
))
547 if (addr
->type
!= SOCKADDRTYPE_PORT
)
550 listener
= open_port(addr
->addr
.port
);
555 sock
->sk
= (struct sock
*) listener
;
560 int cor_socket_connect(struct socket
*sock
, struct sockaddr
*vaddr
,
561 int sockaddr_len
, int flags
)
563 struct sock_buffertracker
*sbt
;
565 struct conn
*src_sock
;
567 if (unlikely(sock
->sk
!= 0))
570 src_sock
= alloc_conn(GFP_KERNEL
);
572 if (unlikely(src_sock
== 0))
575 src_sock
->is_client
= 1;
577 mutex_lock(&sock_bufferlimits_lock
);
578 sbt
= get_sock_buffertracker(current_uid());
579 mutex_unlock(&sock_bufferlimits_lock
);
581 if (unlikely(sbt
== 0)) {
582 reset_conn(src_sock
);
586 kref_get(&(src_sock
->ref
));
588 mutex_lock(&(src_sock
->rcv_lock
));
589 mutex_lock(&(src_sock
->reversedir
->rcv_lock
));
590 conn_init_sock_source(src_sock
);
591 src_sock
->source
.sock
.sbt
= sbt
;
592 conn_init_sock_target(src_sock
->reversedir
);
593 mutex_unlock(&(src_sock
->reversedir
->rcv_lock
));
594 mutex_unlock(&(src_sock
->rcv_lock
));
596 sock
->sk
= (struct sock
*) src_sock
;
597 sock
->state
= SS_CONNECTED
;
602 static int cor_rdytoaccept(struct connlistener
*cl
)
605 mutex_lock(&(cl
->lock
));
606 rc
= (list_empty(&(cl
->conn_queue
)) == 0);
607 mutex_unlock(&(cl
->lock
));
611 const struct proto_ops cor_proto_ops
;
613 int cor_socket_accept(struct socket
*sock
, struct socket
*newsock
, int flags
)
615 struct sock_buffertracker
*sbt
;
617 struct connlistener
*cl
= (struct connlistener
*) sock
->sk
;
619 int rc
= check_connlistener_state(cl
);
621 struct conn
*src_sock_o
;
626 mutex_lock(&sock_bufferlimits_lock
);
627 sbt
= get_sock_buffertracker(current_uid());
628 mutex_unlock(&sock_bufferlimits_lock
);
630 if (unlikely(sbt
== 0))
633 mutex_lock(&(cl
->lock
));
635 if (unlikely(cl
->queue_maxlen
<= 0)) {
636 mutex_unlock(&(cl
->lock
));
640 while (list_empty(&(cl
->conn_queue
))) {
641 mutex_unlock(&(cl
->lock
));
642 if (wait_event_interruptible(cl
->wait
, cor_rdytoaccept(cl
))) {
643 kref_put(&(sbt
->ref
), free_sbt
);
646 mutex_lock(&(cl
->lock
));
649 src_sock_o
= container_of(cl
->conn_queue
.next
, struct conn
,
650 source
.sock
.cl_list
);
652 BUG_ON(src_sock_o
->sourcetype
!= SOURCE_SOCK
);
654 list_del(cl
->conn_queue
.next
);
658 mutex_unlock(&(cl
->lock
));
660 mutex_lock(&(src_sock_o
->rcv_lock
));
661 src_sock_o
->source
.sock
.sbt
= sbt
;
662 mutex_unlock(&(src_sock_o
->rcv_lock
));
664 newsock
->ops
= &cor_proto_ops
;
665 newsock
->sk
= (struct sock
*) src_sock_o
;
666 newsock
->state
= SS_CONNECTED
;
671 int cor_socket_listen(struct socket
*sock
, int len
)
673 struct connlistener
*cl
= (struct connlistener
*) sock
->sk
;
675 int rc
= check_connlistener_state(cl
);
680 mutex_lock(&(cl
->lock
));
681 cl
->queue_maxlen
= len
;
682 mutex_unlock(&(cl
->lock
));
687 int cor_socket_shutdown(struct socket
*sock
, int flags
)
692 int cor_ioctl(struct socket
*sock
, unsigned int cmd
, unsigned long arg
)
697 static int sendmsg_maypush(struct conn
*src_sock_o
)
700 mutex_lock(&sock_bufferlimits_lock
);
701 mutex_lock(&(src_sock_o
->rcv_lock
));
702 if (unlikely(atomic_read(&(src_sock_o
->isreset
)) != 0)) {
704 } else if (src_sock_o
->source
.sock
.wait_len
== 0) {
706 } else if (src_sock_o
->source
.sock
.alloclimit
+
707 src_sock_o
->data_buf
.cpacket_buffer
>
708 src_sock_o
->data_buf
.totalsize
+
709 src_sock_o
->data_buf
.overhead
) {
712 reserve_sock_buffer(src_sock_o
,
713 src_sock_o
->source
.sock
.wait_len
);
714 if (src_sock_o
->source
.sock
.alloclimit
+
715 src_sock_o
->data_buf
.cpacket_buffer
>
716 src_sock_o
->data_buf
.totalsize
+
717 src_sock_o
->data_buf
.overhead
)
720 mutex_unlock(&(src_sock_o
->rcv_lock
));
721 mutex_unlock(&sock_bufferlimits_lock
);
725 int cor_sendmsg(struct kiocb
*iocb
, struct socket
*sock
, struct msghdr
*msg
,
730 struct conn
*src_sock_o
= (struct conn
*) sock
->sk
;
732 int rc
= check_conn_state(src_sock_o
);
734 int flush
= (msg
->msg_flags
& MSG_MORE
) == 0;
735 int blocking
= (msg
->msg_flags
& MSG_DONTWAIT
) == 0;
738 __u64 max
= (1LL << 32) - 1;
739 __u32 totallen
= (total_len
> max
? max
: total_len
);
745 mutex_lock(&sock_bufferlimits_lock
);
746 mutex_lock(&(src_sock_o
->rcv_lock
));
748 if (unlikely(atomic_read(&(src_sock_o
->isreset
)) != 0)) {
749 mutex_lock(&(src_sock_o
->rcv_lock
));
750 mutex_unlock(&sock_bufferlimits_lock
);
754 reserve_sock_buffer(src_sock_o
, totallen
);
756 bufferfree
= (__s64
) src_sock_o
->source
.sock
.alloclimit
+
757 (__s64
) src_sock_o
->data_buf
.cpacket_buffer
-
758 (__s64
) src_sock_o
->data_buf
.totalsize
-
759 (__s64
) src_sock_o
->data_buf
.overhead
;
761 mutex_unlock(&sock_bufferlimits_lock
);
763 if (bufferfree
<= 0) {
769 copied
= receive_userbuf(src_sock_o
, msg
, bufferfree
, bufferfree
>=
770 totallen
? 0 : (src_sock_o
->source
.sock
.alloclimit
+
771 src_sock_o
->data_buf
.cpacket_buffer
));
776 bufferfree
= (__s64
) src_sock_o
->source
.sock
.alloclimit
+
777 (__s64
) src_sock_o
->data_buf
.cpacket_buffer
-
778 (__s64
) src_sock_o
->data_buf
.totalsize
-
779 (__s64
) src_sock_o
->data_buf
.overhead
;
782 if (copied
== -EAGAIN
)
783 src_sock_o
->source
.sock
.wait_len
= totallen
;
785 src_sock_o
->source
.sock
.wait_len
= 0;
787 mutex_unlock(&(src_sock_o
->rcv_lock
));
789 unreserve_sock_buffer(src_sock_o
);
791 mutex_lock(&sock_bufferlimits_lock
);
792 mutex_lock(&(src_sock_o
->rcv_lock
));
794 if (unlikely(atomic_read(&(src_sock_o
->isreset
)) != 0)) {
795 mutex_lock(&(src_sock_o
->rcv_lock
));
796 mutex_unlock(&sock_bufferlimits_lock
);
800 if (flush
== 0 && src_sock_o
->data_buf
.totalsize
+
801 src_sock_o
->data_buf
.overhead
-
802 src_sock_o
->data_buf
.cpacket_buffer
<
803 (BUFFERLIMIT_SOCK_SOCK
*3)/4) {
804 if (src_sock_o
->source
.sock
.delay_flush
== 0) {
805 struct sock_buffertracker
*sbt
=
806 src_sock_o
->source
.sock
.sbt
;
808 list_add_tail(&(src_sock_o
->source
.sock
.delflush_list
),
809 &(sbt
->delflush_conns
));
811 src_sock_o
->source
.sock
.delay_flush
= 1;
813 if (src_sock_o
->source
.sock
.delay_flush
) {
814 list_del(&(src_sock_o
->source
.sock
.delflush_list
));
816 src_sock_o
->source
.sock
.delay_flush
= 0;
819 mutex_unlock(&(src_sock_o
->rcv_lock
));
820 mutex_unlock(&sock_bufferlimits_lock
);
822 if (likely(copied
> 0 || bufferfree
<= 0))
823 flush_buf(src_sock_o
);
825 if (copied
== -EAGAIN
&& blocking
) {
826 if (wait_event_interruptible(src_sock_o
->source
.sock
.wait
,
827 sendmsg_maypush(src_sock_o
)) == 0)
829 copied
= -ERESTARTSYS
;
832 BUG_ON(copied
> total_len
);
836 static int cor_readytoread(struct conn
*trgt_sock_o
)
839 mutex_lock(&(trgt_sock_o
->rcv_lock
));
840 rc
= (trgt_sock_o
->data_buf
.read_remaining
!= 0) ||
841 unlikely(atomic_read(&(trgt_sock_o
->isreset
)) != 0);
842 mutex_unlock(&(trgt_sock_o
->rcv_lock
));
846 int cor_recvmsg(struct kiocb
*iocb
, struct socket
*sock
, struct msghdr
*msg
,
847 size_t total_len
, int flags
)
849 struct conn
*src_sock_o
= (struct conn
*) sock
->sk
;
850 struct conn
*trgt_sock_o
;
853 int rc
= check_conn_state(src_sock_o
);
855 int blocking
= (flags
& MSG_DONTWAIT
) == 0;
860 trgt_sock_o
= src_sock_o
->reversedir
;
862 BUG_ON(trgt_sock_o
== 0);
865 mutex_lock(&(trgt_sock_o
->rcv_lock
));
867 if (unlikely(atomic_read(&(trgt_sock_o
->isreset
)) != 0)) {
872 copied
= databuf_pulluser(trgt_sock_o
, msg
);
873 databuf_ackread(trgt_sock_o
);
876 mutex_unlock(&(trgt_sock_o
->rcv_lock
));
878 if (likely(copied
> 0)) {
879 refresh_conn_credits(trgt_sock_o
, 0, 0);
880 #warning todo move unreserve calls to wake_sender
881 unreserve_sock_buffer(trgt_sock_o
);
882 wake_sender(trgt_sock_o
);
886 if (copied
== -EAGAIN
&& blocking
) {
887 if (wait_event_interruptible(trgt_sock_o
->target
.sock
.wait
,
888 cor_readytoread(trgt_sock_o
)) == 0)
890 copied
= -ERESTARTSYS
;
896 const struct proto_ops cor_proto_ops
= {
898 .owner
= THIS_MODULE
,
899 .release
= cor_socket_release
,
900 .bind
= cor_socket_bind
,
901 .connect
= cor_socket_connect
,
902 .accept
= cor_socket_accept
,
903 .listen
= cor_socket_listen
,
904 .shutdown
= cor_socket_shutdown
,
906 .sendmsg
= cor_sendmsg
,
907 .recvmsg
= cor_recvmsg
922 int cor_createsock(struct net
*net
, struct socket
*sock
, int protocol
)
924 if (unlikely(protocol
!= 0))
925 return -EPROTONOSUPPORT
;
927 sock
->state
= SS_UNCONNECTED
;
928 sock
->ops
= &cor_proto_ops
;
933 static struct net_proto_family cor_net_proto_family
= {
935 .create
= cor_createsock
,
939 static int __init
cor_sock_init(void)
941 INIT_WORK(&outofsockbufferspace_work
, outofsockbufferspace
);
942 outofsockbufferspace_scheduled
= 0;
944 sock_register(&cor_net_proto_family
);
945 sock_bufferusage
= 0;
949 module_init(cor_sock_init
);
951 MODULE_LICENSE("GPL");