2 * Connection oriented routing
3 * Copyright (C) 2007-2011 Michael Blizek
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <linux/net.h>
23 #include <asm/uaccess.h>
28 * sock_bt_wait_list and waiting_conns are ordered by min amount first, the
29 * order in which resuming will happen
31 DEFINE_MUTEX(sock_bufferlimits_lock
);
32 LIST_HEAD(sock_bt_list
);
33 LIST_HEAD(sock_bt_wait_list
);
34 static __u64 sock_bufferusage
;
36 static struct work_struct outofsockbufferspace_work
;
37 static int outofsockbufferspace_scheduled
;
39 void free_sbt(struct kref
*ref
)
41 struct sock_buffertracker
*sbt
= container_of(ref
,
42 struct sock_buffertracker
, ref
);
44 BUG_ON(sbt
->usage
!= 0);
45 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
51 static struct sock_buffertracker
*get_sock_buffertracker(uid_t uid
)
53 struct sock_buffertracker
*sbt
;
54 struct list_head
*curr
;
56 curr
= sock_bt_list
.next
;
57 while (curr
!= &sock_bt_list
) {
58 sbt
= container_of(curr
, struct sock_buffertracker
, lh
);
59 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
65 curr
= sock_bt_wait_list
.next
;
66 while (curr
!= &sock_bt_wait_list
) {
67 sbt
= container_of(curr
, struct sock_buffertracker
, lh
);
68 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
74 sbt
= kmalloc(sizeof(struct sock_buffertracker
), GFP_KERNEL
);
76 memset(sbt
, 0, sizeof(struct sock_buffertracker
));
78 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
79 INIT_LIST_HEAD(&(sbt
->delflush_conns
));
80 INIT_LIST_HEAD(&(sbt
->waiting_conns
));
81 kref_init(&(sbt
->ref
));
86 kref_get(&(sbt
->ref
));
91 static void _reserve_sock_buffer_reord_bt(struct sock_buffertracker
*sbt
,
92 int waitingconnremoved
)
94 if (waitingconnremoved
&& list_empty(&(sbt
->waiting_conns
))) {
96 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
100 if (list_empty(&(sbt
->waiting_conns
)))
103 while(sbt
->lh
.next
!= &sock_bt_wait_list
) {
104 struct sock_buffertracker
*next
= container_of(sbt
->lh
.next
,
105 struct sock_buffertracker
, lh
);
107 BUG_ON(sbt
->lh
.next
== &sock_bt_list
);
109 if (sbt
->usage
<= next
->usage
)
112 list_del(&(sbt
->lh
));
113 list_add(&(sbt
->lh
), &(next
->lh
));
117 static int oosbs_resumesbt(struct sock_buffertracker
*sbt
)
120 struct list_head
*curr
= sbt
->delflush_conns
.next
;
122 while (curr
!= &(sbt
->delflush_conns
)) {
123 struct conn
*src_in_o
= container_of(curr
, struct conn
,
124 source
.sock
.delflush_list
);
127 mutex_lock(&(src_in_o
->rcv_lock
));
129 BUG_ON(src_in_o
->sourcetype
!= SOURCE_SOCK
);
131 BUG_ON(src_in_o
->source
.sock
.delay_flush
== 0);
133 if (src_in_o
->data_buf
.read_remaining
!= 0) {
134 src_in_o
->source
.sock
.delay_flush
= 0;
135 list_del(&(src_in_o
->source
.sock
.delflush_list
));
139 mutex_unlock(&(src_in_o
->rcv_lock
));
144 kref_get(&(sbt
->ref
));
145 mutex_unlock(&sock_bufferlimits_lock
);
154 kref_put(&(sbt
->ref
), free_sbt
);
159 static void oosbs_global(void)
161 struct list_head
*curr
;
165 mutex_lock(&sock_bufferlimits_lock
);
168 curr
= sock_bt_list
.prev
;
169 while (curr
!= &sock_bt_list
) {
170 struct sock_buffertracker
*sbt
= container_of(curr
,
171 struct sock_buffertracker
, lh
);
172 BUG_ON(list_empty(&(sbt
->waiting_conns
)) == 0);
173 if (oosbs_resumesbt(sbt
))
178 curr
= sock_bt_wait_list
.prev
;
179 while (curr
!= &sock_bt_wait_list
) {
180 struct sock_buffertracker
*sbt
= container_of(curr
,
181 struct sock_buffertracker
, lh
);
182 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
183 if (oosbs_resumesbt(sbt
))
189 static void oosbs_user(void)
191 struct list_head
*curr
;
195 mutex_lock(&sock_bufferlimits_lock
);
198 curr
= sock_bt_wait_list
.prev
;
199 while (curr
!= &sock_bt_wait_list
) {
200 struct sock_buffertracker
*sbt
= container_of(curr
,
201 struct sock_buffertracker
, lh
);
202 BUG_ON(list_empty(&(sbt
->waiting_conns
)));
204 if (sbt
->usage
< (BUFFERLIMIT_SOCK_USER
* 3 / 4))
207 if (oosbs_resumesbt(sbt
))
213 static void outofsockbufferspace(struct work_struct
*work
)
215 mutex_lock(&sock_bufferlimits_lock
);
216 if (sock_bufferusage
< (BUFFERLIMIT_SOCK_GLOBAL
* 3 / 4)) {
218 if (sock_bufferusage
>= (BUFFERLIMIT_SOCK_GLOBAL
* 3 / 4))
224 outofsockbufferspace_scheduled
= 0;
225 mutex_unlock(&sock_bufferlimits_lock
);
228 static void _reserve_sock_buffer_inswl(struct conn
*src_in_l
)
230 struct sock_buffertracker
*sbt
= src_in_l
->source
.sock
.sbt
;
231 struct list_head
*curr
;
235 if (list_empty(&(sbt
->waiting_conns
)) == 0)
238 list_del(&(sbt
->lh
));
240 curr
= sock_bt_wait_list
.next
;
241 while (curr
!= &sock_bt_wait_list
) {
242 struct sock_buffertracker
*currsbt
= container_of(curr
,
243 struct sock_buffertracker
, lh
);
244 BUG_ON(list_empty(&(currsbt
->waiting_conns
)));
245 if (sbt
->usage
< currsbt
->usage
) {
246 list_add(&(sbt
->lh
), curr
);
252 list_add_tail(&(sbt
->lh
), &sock_bt_wait_list
);
255 curr
= sbt
->waiting_conns
.next
;
256 while (curr
!= &(sbt
->waiting_conns
)) {
257 struct conn
*currrconn
= container_of(curr
, struct conn
,
258 source
.sock
.alwait_list
);
259 BUG_ON(currrconn
->sourcetype
!= SOURCE_SOCK
);
260 if (src_in_l
->source
.sock
.alloclimit
<
261 currrconn
->source
.sock
.alloclimit
) {
262 list_add(&(src_in_l
->source
.sock
.alwait_list
), curr
);
268 list_add_tail(&(src_in_l
->source
.sock
.alwait_list
),
269 &(sbt
->waiting_conns
));
272 src_in_l
->source
.sock
.in_alwait_list
= 1;
274 if (outofsockbufferspace_scheduled
== 0) {
275 schedule_work(&outofsockbufferspace_work
);
276 outofsockbufferspace_scheduled
= 1;
280 static void reserve_sock_buffer(struct conn
*src_in_l
, __u64 amount
)
282 struct sock_buffertracker
*sbt
= src_in_l
->source
.sock
.sbt
;
283 struct sock_buffertracker
*first_wait_sbt
= list_empty(
284 &sock_bt_wait_list
) ? 0 : container_of(
285 sock_bt_wait_list
.next
, struct sock_buffertracker
, lh
);
287 __u32 max
= (1 << 30) - 1;
291 if (unlikely(amount
> max
))
294 amount
+= src_in_l
->data_buf
.totalsize
+ src_in_l
->data_buf
.overhead
-
295 src_in_l
->data_buf
.cpacket_buffer
;
297 if (unlikely(amount
> max
))
300 if (amount
> BUFFERLIMIT_SOCK_SOCK
)
301 amount
= BUFFERLIMIT_SOCK_SOCK
;
303 if (amount
<= src_in_l
->source
.sock
.alloclimit
)
306 if ((list_empty(&sock_bt_wait_list
) == 0 && first_wait_sbt
!= 0 &&
307 first_wait_sbt
!= sbt
&&
308 first_wait_sbt
->usage
<= sbt
->usage
) ||
309 amount
- src_in_l
->source
.sock
.alloclimit
>
310 BUFFERLIMIT_SOCK_USER
- sbt
->usage
||
311 amount
- src_in_l
->source
.sock
.alloclimit
>
312 BUFFERLIMIT_SOCK_GLOBAL
- sock_bufferusage
) {
313 _reserve_sock_buffer_inswl(src_in_l
);
315 int waitingconnremoved
= 0;
316 sbt
->usage
+= amount
- src_in_l
->source
.sock
.alloclimit
;
317 sock_bufferusage
+= amount
- src_in_l
->source
.sock
.alloclimit
;
318 src_in_l
->source
.sock
.alloclimit
= amount
;
320 if (src_in_l
->source
.sock
.in_alwait_list
){
321 list_del(&(src_in_l
->source
.sock
.alwait_list
));
322 src_in_l
->source
.sock
.in_alwait_list
= 0;
323 waitingconnremoved
= 1;
325 _reserve_sock_buffer_reord_bt(sbt
, waitingconnremoved
);
329 static int _resume_bufferwaiting_socks(struct sock_buffertracker
*sbt
)
333 while (list_empty(&(sbt
->waiting_conns
)) && failed
== 0) {
334 struct conn
*src_in_o
= container_of(sbt
->waiting_conns
.next
,
335 struct conn
, source
.sock
.alwait_list
);
336 mutex_lock(&(src_in_o
->rcv_lock
));
338 BUG_ON(src_in_o
->sourcetype
== SOURCE_SOCK
);
339 BUG_ON(src_in_o
->source
.sock
.in_alwait_list
== 0);
340 BUG_ON(src_in_o
->source
.sock
.wait_len
== 0);
342 reserve_sock_buffer(src_in_o
, src_in_o
->source
.sock
.wait_len
);
344 if (src_in_o
->source
.sock
.alloclimit
+
345 src_in_o
->data_buf
.cpacket_buffer
<=
346 src_in_o
->data_buf
.totalsize
+
347 src_in_o
->data_buf
.overhead
) {
352 wake_up_interruptible(&(src_in_o
->source
.sock
.wait
));
355 mutex_unlock(&(src_in_o
->rcv_lock
));
361 static void resume_bufferwaiting_socks(void)
363 struct list_head
*curr
= sock_bt_wait_list
.next
;
365 while (curr
!= &sock_bt_wait_list
) {
366 struct sock_buffertracker
*currsbt
= container_of(curr
,
367 struct sock_buffertracker
, lh
);
368 BUG_ON(list_empty(&(currsbt
->waiting_conns
)));
371 if (_resume_bufferwaiting_socks(currsbt
))
376 void unreserve_sock_buffer(struct conn
*cn
)
379 struct sock_buffertracker
*sbt
;
381 mutex_lock(&sock_bufferlimits_lock
);
382 mutex_lock(&(cn
->rcv_lock
));
384 if (cn
->sourcetype
!= SOURCE_SOCK
)
387 sbt
= cn
->source
.sock
.sbt
;
390 if (cn
->data_buf
.totalsize
+ cn
->data_buf
.overhead
<=
391 cn
->source
.sock
.alloclimit
+
392 cn
->data_buf
.cpacket_buffer
)
397 BUG_ON(cn
->source
.sock
.alloclimit
> sbt
->usage
);
398 BUG_ON(cn
->source
.sock
.alloclimit
> sock_bufferusage
);
399 BUG_ON(cn
->data_buf
.cpacket_buffer
> cn
->data_buf
.totalsize
+
400 cn
->data_buf
.overhead
);
402 sbt
->usage
-= cn
->source
.sock
.alloclimit
;
403 sbt
->usage
+= cn
->data_buf
.totalsize
;
404 sbt
->usage
+= cn
->data_buf
.overhead
;
405 sbt
->usage
-= cn
->data_buf
.cpacket_buffer
;
407 sock_bufferusage
-= cn
->source
.sock
.alloclimit
;
408 sock_bufferusage
+= cn
->data_buf
.totalsize
;
409 sock_bufferusage
+= cn
->data_buf
.overhead
;
410 sock_bufferusage
-= cn
->data_buf
.cpacket_buffer
;
412 cn
->source
.sock
.alloclimit
= cn
->data_buf
.totalsize
+
413 cn
->data_buf
.overhead
- cn
->data_buf
.cpacket_buffer
;
415 if (cn
->source
.sock
.alloclimit
== 0 &&
416 cn
->source
.sock
.in_alwait_list
) {
417 list_del(&(cn
->source
.sock
.alwait_list
));
418 cn
->source
.sock
.in_alwait_list
= 0;
420 if (list_empty(&(sbt
->waiting_conns
))) {
421 list_del(&(sbt
->lh
));
422 list_add_tail(&(sbt
->lh
), &sock_bt_list
);
426 if (list_empty(&(sbt
->waiting_conns
)))
429 while (sbt
->lh
.prev
!= &sock_bt_wait_list
) {
430 struct sock_buffertracker
*prevsbt
= container_of(sbt
->lh
.prev
,
431 struct sock_buffertracker
, lh
);
433 BUG_ON(sbt
->lh
.next
== &sock_bt_list
);
435 if (prevsbt
->usage
<= sbt
->usage
)
438 list_del(&(sbt
->lh
));
439 list_add_tail(&(sbt
->lh
), &(prevsbt
->lh
));
443 mutex_unlock(&(cn
->rcv_lock
));
446 resume_bufferwaiting_socks();
448 mutex_unlock(&sock_bufferlimits_lock
);
452 static int check_connlistener_state(struct connlistener
*cl
)
454 if (likely(cl
!= 0 && cl
->sockstate
== SOCKSTATE_LISTENER
))
460 static int check_conn_state(struct conn
*cn
)
462 if (likely(cn
!= 0 && cn
->sockstate
== SOCKSTATE_CONN
))
468 int cor_socket_release(struct socket
*sock
)
470 struct connlistener
*cl
= (struct connlistener
*) sock
->sk
;
471 struct conn
*src_in_o
= (struct conn
*) sock
->sk
;
476 if (cl
->sockstate
== SOCKSTATE_LISTENER
) {
478 } else if (src_in_o
->sockstate
== SOCKSTATE_CONN
) {
479 reset_conn(src_in_o
);
480 BUG_ON(src_in_o
->sourcetype
!= SOURCE_SOCK
);
481 kref_put(&(src_in_o
->ref
), free_conn
);
489 int cor_socket_bind(struct socket
*sock
, struct sockaddr
*myaddr
,
492 struct connlistener
*listener
;
493 struct cor_sockaddr
*addr
= (struct cor_sockaddr
*) myaddr
;
495 if (unlikely(sock
->sk
!= 0))
498 if (sockaddr_len
< sizeof(struct cor_sockaddr
))
501 if (addr
->type
!= SOCKADDRTYPE_PORT
)
504 listener
= open_port(addr
->addr
.port
);
509 sock
->sk
= (struct sock
*) listener
;
514 int cor_socket_connect(struct socket
*sock
, struct sockaddr
*vaddr
,
515 int sockaddr_len
, int flags
)
517 struct sock_buffertracker
*sbt
;
519 struct conn
*src_sock
;
521 if (unlikely(sock
->sk
!= 0))
524 src_sock
= alloc_conn(GFP_KERNEL
);
526 if (unlikely(src_sock
== 0))
529 src_sock
->is_client
= 1;
531 mutex_lock(&sock_bufferlimits_lock
);
532 sbt
= get_sock_buffertracker(current_uid());
533 mutex_unlock(&sock_bufferlimits_lock
);
535 if (unlikely(sbt
== 0)) {
536 reset_conn(src_sock
);
540 kref_get(&(src_sock
->ref
));
542 mutex_lock(&(src_sock
->rcv_lock
));
543 mutex_lock(&(src_sock
->reversedir
->rcv_lock
));
544 conn_init_sock_source(src_sock
);
545 src_sock
->source
.sock
.sbt
= sbt
;
546 conn_init_sock_target(src_sock
->reversedir
);
547 mutex_unlock(&(src_sock
->reversedir
->rcv_lock
));
548 mutex_unlock(&(src_sock
->rcv_lock
));
550 sock
->sk
= (struct sock
*) src_sock
;
551 sock
->state
= SS_CONNECTED
;
556 static int cor_rdytoaccept(struct connlistener
*cl
)
559 mutex_lock(&(cl
->lock
));
560 rc
= (list_empty(&(cl
->conn_queue
)) == 0);
561 mutex_unlock(&(cl
->lock
));
565 const struct proto_ops cor_proto_ops
;
567 int cor_socket_accept(struct socket
*sock
, struct socket
*newsock
, int flags
)
569 struct sock_buffertracker
*sbt
;
571 struct connlistener
*cl
= (struct connlistener
*) sock
->sk
;
573 int rc
= check_connlistener_state(cl
);
575 struct conn
*src_sock_o
;
580 mutex_lock(&sock_bufferlimits_lock
);
581 sbt
= get_sock_buffertracker(current_uid());
582 mutex_unlock(&sock_bufferlimits_lock
);
584 if (unlikely(sbt
== 0))
587 mutex_lock(&(cl
->lock
));
589 if (unlikely(cl
->queue_maxlen
<= 0)) {
590 mutex_unlock(&(cl
->lock
));
594 while (list_empty(&(cl
->conn_queue
))) {
595 mutex_unlock(&(cl
->lock
));
596 if (wait_event_interruptible(cl
->wait
, cor_rdytoaccept(cl
))) {
597 kref_put(&(sbt
->ref
), free_sbt
);
600 mutex_lock(&(cl
->lock
));
603 src_sock_o
= container_of(cl
->conn_queue
.next
, struct conn
,
604 source
.sock
.cl_list
);
606 BUG_ON(src_sock_o
->sourcetype
!= SOURCE_SOCK
);
608 list_del(cl
->conn_queue
.next
);
612 mutex_unlock(&(cl
->lock
));
614 mutex_lock(&(src_sock_o
->rcv_lock
));
615 src_sock_o
->source
.sock
.sbt
= sbt
;
616 mutex_unlock(&(src_sock_o
->rcv_lock
));
618 newsock
->ops
= &cor_proto_ops
;
619 newsock
->sk
= (struct sock
*) src_sock_o
;
620 newsock
->state
= SS_CONNECTED
;
625 int cor_socket_listen(struct socket
*sock
, int len
)
627 struct connlistener
*cl
= (struct connlistener
*) sock
->sk
;
629 int rc
= check_connlistener_state(cl
);
634 mutex_lock(&(cl
->lock
));
635 cl
->queue_maxlen
= len
;
636 mutex_unlock(&(cl
->lock
));
641 int cor_socket_shutdown(struct socket
*sock
, int flags
)
646 int cor_ioctl(struct socket
*sock
, unsigned int cmd
, unsigned long arg
)
651 static int sendmsg_maypush(struct conn
*src_sock_o
)
654 mutex_lock(&sock_bufferlimits_lock
);
655 mutex_lock(&(src_sock_o
->rcv_lock
));
656 if (unlikely(atomic_read(&(src_sock_o
->isreset
)) != 0)) {
658 } else if (src_sock_o
->source
.sock
.wait_len
== 0) {
660 } else if (src_sock_o
->source
.sock
.alloclimit
+
661 src_sock_o
->data_buf
.cpacket_buffer
>
662 src_sock_o
->data_buf
.totalsize
+
663 src_sock_o
->data_buf
.overhead
) {
666 reserve_sock_buffer(src_sock_o
,
667 src_sock_o
->source
.sock
.wait_len
);
668 if (src_sock_o
->source
.sock
.alloclimit
+
669 src_sock_o
->data_buf
.cpacket_buffer
>
670 src_sock_o
->data_buf
.totalsize
+
671 src_sock_o
->data_buf
.overhead
)
674 mutex_unlock(&(src_sock_o
->rcv_lock
));
675 mutex_unlock(&sock_bufferlimits_lock
);
679 int cor_sendmsg(struct kiocb
*iocb
, struct socket
*sock
, struct msghdr
*msg
,
684 struct conn
*src_sock_o
= (struct conn
*) sock
->sk
;
686 int rc
= check_conn_state(src_sock_o
);
688 int flush
= (msg
->msg_flags
& MSG_MORE
) == 0;
689 int blocking
= (msg
->msg_flags
& MSG_DONTWAIT
) == 0;
692 __u64 max
= (1LL << 32) - 1;
693 __u32 totallen
= (total_len
> max
? max
: total_len
);
699 mutex_lock(&sock_bufferlimits_lock
);
700 mutex_lock(&(src_sock_o
->rcv_lock
));
702 if (unlikely(atomic_read(&(src_sock_o
->isreset
)) != 0)) {
703 mutex_unlock(&sock_bufferlimits_lock
);
708 reserve_sock_buffer(src_sock_o
, totallen
);
710 bufferfree
= (__s64
) src_sock_o
->source
.sock
.alloclimit
+
711 (__s64
) src_sock_o
->data_buf
.cpacket_buffer
-
712 (__s64
) src_sock_o
->data_buf
.totalsize
-
713 (__s64
) src_sock_o
->data_buf
.overhead
;
715 mutex_unlock(&sock_bufferlimits_lock
);
717 if (bufferfree
<= 0) {
723 copied
= receive_userbuf(src_sock_o
, msg
, bufferfree
, bufferfree
>=
724 totallen
? 0 : (src_sock_o
->source
.sock
.alloclimit
+
725 src_sock_o
->data_buf
.cpacket_buffer
));
730 bufferfree
= (__s64
) src_sock_o
->source
.sock
.alloclimit
+
731 (__s64
) src_sock_o
->data_buf
.cpacket_buffer
-
732 (__s64
) src_sock_o
->data_buf
.totalsize
-
733 (__s64
) src_sock_o
->data_buf
.overhead
;
736 if (copied
== -EAGAIN
)
737 src_sock_o
->source
.sock
.wait_len
= totallen
;
739 src_sock_o
->source
.sock
.wait_len
= 0;
741 mutex_unlock(&(src_sock_o
->rcv_lock
));
743 unreserve_sock_buffer(src_sock_o
);
745 mutex_lock(&sock_bufferlimits_lock
);
746 mutex_lock(&(src_sock_o
->rcv_lock
));
748 if (flush
== 0 && src_sock_o
->data_buf
.totalsize
+
749 src_sock_o
->data_buf
.overhead
-
750 src_sock_o
->data_buf
.cpacket_buffer
<
751 (BUFFERLIMIT_SOCK_SOCK
*3)/4) {
752 if (src_sock_o
->source
.sock
.delay_flush
== 0) {
753 struct sock_buffertracker
*sbt
=
754 src_sock_o
->source
.sock
.sbt
;
755 list_add_tail(&(src_sock_o
->source
.sock
.delflush_list
),
756 &(sbt
->delflush_conns
));
758 src_sock_o
->source
.sock
.delay_flush
= 1;
760 if (src_sock_o
->source
.sock
.delay_flush
) {
761 list_del(&(src_sock_o
->source
.sock
.delflush_list
));
763 src_sock_o
->source
.sock
.delay_flush
= 0;
766 mutex_unlock(&(src_sock_o
->rcv_lock
));
767 mutex_unlock(&sock_bufferlimits_lock
);
769 if (likely(copied
> 0 || bufferfree
<= 0))
770 flush_buf(src_sock_o
);
772 if (copied
== -EAGAIN
&& blocking
) {
773 if (wait_event_interruptible(src_sock_o
->source
.sock
.wait
,
774 sendmsg_maypush(src_sock_o
)) == 0)
776 copied
= -ERESTARTSYS
;
779 BUG_ON(copied
> total_len
);
783 static int cor_readytoread(struct conn
*trgt_sock_o
)
786 mutex_lock(&(trgt_sock_o
->rcv_lock
));
787 rc
= (trgt_sock_o
->data_buf
.read_remaining
!= 0) ||
788 unlikely(atomic_read(&(trgt_sock_o
->isreset
)) != 0);
789 mutex_unlock(&(trgt_sock_o
->rcv_lock
));
793 int cor_recvmsg(struct kiocb
*iocb
, struct socket
*sock
, struct msghdr
*msg
,
794 size_t total_len
, int flags
)
796 struct conn
*src_sock_o
= (struct conn
*) sock
->sk
;
797 struct conn
*trgt_sock_o
;
800 int rc
= check_conn_state(src_sock_o
);
802 int blocking
= (flags
& MSG_DONTWAIT
) == 0;
807 trgt_sock_o
= src_sock_o
->reversedir
;
809 BUG_ON(trgt_sock_o
== 0);
812 mutex_lock(&(trgt_sock_o
->rcv_lock
));
814 if (unlikely(atomic_read(&(trgt_sock_o
->isreset
)) != 0)) {
819 copied
= databuf_pulluser(trgt_sock_o
, msg
);
820 databuf_ackread(trgt_sock_o
);
823 mutex_unlock(&(trgt_sock_o
->rcv_lock
));
825 if (likely(copied
> 0)) {
826 refresh_conn_credits(trgt_sock_o
, 0, 0);
827 #warning todo move unreserve calls to wake_sender
828 unreserve_sock_buffer(trgt_sock_o
);
829 wake_sender(trgt_sock_o
);
833 if (copied
== -EAGAIN
&& blocking
) {
834 if (wait_event_interruptible(trgt_sock_o
->target
.sock
.wait
,
835 cor_readytoread(trgt_sock_o
)) == 0)
837 copied
= -ERESTARTSYS
;
843 const struct proto_ops cor_proto_ops
= {
845 .owner
= THIS_MODULE
,
846 .release
= cor_socket_release
,
847 .bind
= cor_socket_bind
,
848 .connect
= cor_socket_connect
,
849 .accept
= cor_socket_accept
,
850 .listen
= cor_socket_listen
,
851 .shutdown
= cor_socket_shutdown
,
853 .sendmsg
= cor_sendmsg
,
854 .recvmsg
= cor_recvmsg
869 int cor_createsock(struct net
*net
, struct socket
*sock
, int protocol
)
871 if (unlikely(protocol
!= 0))
872 return -EPROTONOSUPPORT
;
874 sock
->state
= SS_UNCONNECTED
;
875 sock
->ops
= &cor_proto_ops
;
880 static struct net_proto_family cor_net_proto_family
= {
882 .create
= cor_createsock
,
886 static int __init
cor_sock_init(void)
888 INIT_WORK(&outofsockbufferspace_work
, outofsockbufferspace
);
889 outofsockbufferspace_scheduled
= 0;
891 sock_register(&cor_net_proto_family
);
892 sock_bufferusage
= 0;
896 module_init(cor_sock_init
);
898 MODULE_LICENSE("GPL");