2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #define __KERNEL_SYSCALLS__
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/malloc.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
56 #include <linux/net.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
64 #include <net/checksum.h>
67 #include <asm/uaccess.h>
69 /* Following value should be > 32k + RPC overhead */
70 #define XPRT_MIN_WRITE_SPACE 35000
72 extern spinlock_t rpc_queue_lock
;
78 /* Spinlock for critical sections in the code. */
79 spinlock_t xprt_sock_lock
= SPIN_LOCK_UNLOCKED
;
80 spinlock_t xprt_lock
= SPIN_LOCK_UNLOCKED
;
83 # undef RPC_DEBUG_DATA
84 # define RPCDBG_FACILITY RPCDBG_XPRT
88 # define MAX(a, b) ((a) > (b)? (a) : (b))
89 # define MIN(a, b) ((a) < (b)? (a) : (b))
95 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
96 static void do_xprt_transmit(struct rpc_task
*);
97 static void xprt_reserve_status(struct rpc_task
*task
);
98 static void xprt_disconnect(struct rpc_xprt
*);
99 static void xprt_reconn_status(struct rpc_task
*task
);
100 static struct socket
*xprt_create_socket(int, struct rpc_timeout
*);
101 static int xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
102 static void xprt_remove_pending(struct rpc_xprt
*);
104 #ifdef RPC_DEBUG_DATA
106 * Print the buffer contents (first 128 bytes only--just enough for
110 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
112 u8
*buf
= (u8
*) packet
;
115 dprintk("RPC: %s\n", msg
);
116 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
120 dprintk("0x%04x ", j
);
122 dprintk("%02x%02x%02x%02x ",
123 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
129 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
136 * Look up RPC transport given an INET socket
138 static inline struct rpc_xprt
*
139 xprt_from_sock(struct sock
*sk
)
141 return (struct rpc_xprt
*) sk
->user_data
;
145 * Adjust the iovec to move on 'n' bytes
149 xprt_move_iov(struct msghdr
*msg
, struct iovec
*niv
, unsigned amount
)
151 struct iovec
*iv
=msg
->msg_iov
;
155 * Eat any sent iovecs
157 while (iv
->iov_len
<= amount
) {
158 amount
-= iv
->iov_len
;
164 * And chew down the partial one
166 niv
[0].iov_len
= iv
->iov_len
-amount
;
167 niv
[0].iov_base
=((unsigned char *)iv
->iov_base
)+amount
;
171 * And copy any others
173 for(i
= 1; i
< msg
->msg_iovlen
; i
++)
180 * Write data to socket.
184 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
186 struct socket
*sock
= xprt
->sock
;
190 int slen
= req
->rq_slen
- req
->rq_bytes_sent
;
191 struct iovec niv
[MAX_IOVEC
];
199 xprt_pktdump("packet data:",
200 req
->rq_svec
->iov_base
,
201 req
->rq_svec
->iov_len
);
203 msg
.msg_flags
= MSG_DONTWAIT
|MSG_NOSIGNAL
;
204 msg
.msg_iov
= req
->rq_svec
;
205 msg
.msg_iovlen
= req
->rq_snr
;
206 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
207 msg
.msg_namelen
= sizeof(xprt
->addr
);
208 msg
.msg_control
= NULL
;
209 msg
.msg_controllen
= 0;
211 /* Dont repeat bytes */
212 if (req
->rq_bytes_sent
)
213 xprt_move_iov(&msg
, niv
, req
->rq_bytes_sent
);
215 oldfs
= get_fs(); set_fs(get_ds());
216 result
= sock_sendmsg(sock
, &msg
, slen
);
219 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen
, result
);
226 /* When the server has died, an ICMP port unreachable message
227 * prompts ECONNREFUSED.
231 if (test_bit(SOCK_NOSPACE
, &sock
->flags
))
236 /* connection broken */
241 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
248 * Read data from socket
251 xprt_recvmsg(struct rpc_xprt
*xprt
, struct iovec
*iov
, int nr
, unsigned len
, unsigned shift
)
253 struct socket
*sock
= xprt
->sock
;
256 struct iovec niv
[MAX_IOVEC
];
262 msg
.msg_flags
= MSG_DONTWAIT
|MSG_NOSIGNAL
;
267 msg
.msg_control
= NULL
;
268 msg
.msg_controllen
= 0;
270 /* Adjust the iovec if we've already filled it */
272 xprt_move_iov(&msg
, niv
, shift
);
274 oldfs
= get_fs(); set_fs(get_ds());
275 result
= sock_recvmsg(sock
, &msg
, len
, MSG_DONTWAIT
);
278 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
285 * Adjust RPC congestion window
286 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
289 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
291 unsigned long cwnd
= xprt
->cwnd
;
293 spin_lock_bh(&xprt_sock_lock
);
297 if (xprt
->cong
< cwnd
|| time_before(jiffies
, xprt
->congtime
))
299 /* The (cwnd >> 1) term makes sure
300 * the result gets rounded properly. */
301 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
302 if (cwnd
> RPC_MAXCWND
)
305 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
306 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 2) / RPC_CWNDSCALE
;
307 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
308 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
309 (xprt
->congtime
-jiffies
)*1000/HZ
);
310 } else if (result
== -ETIMEDOUT
) {
311 if ((cwnd
>>= 1) < RPC_CWNDSCALE
)
312 cwnd
= RPC_CWNDSCALE
;
313 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 3) / RPC_CWNDSCALE
;
314 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
315 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
316 (xprt
->congtime
-jiffies
)*1000/HZ
);
317 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
322 spin_unlock_bh(&xprt_sock_lock
);
326 * Adjust timeout values etc for next retransmit
329 xprt_adjust_timeout(struct rpc_timeout
*to
)
331 if (to
->to_retries
> 0) {
332 if (to
->to_exponential
)
333 to
->to_current
<<= 1;
335 to
->to_current
+= to
->to_increment
;
336 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
)
337 to
->to_current
= to
->to_maxval
;
339 if (to
->to_exponential
)
340 to
->to_initval
<<= 1;
342 to
->to_initval
+= to
->to_increment
;
343 if (to
->to_maxval
&& to
->to_initval
>= to
->to_maxval
)
344 to
->to_initval
= to
->to_maxval
;
345 to
->to_current
= to
->to_initval
;
348 if (!to
->to_current
) {
349 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
350 to
->to_current
= 5 * HZ
;
352 pprintk("RPC: %lu %s\n", jiffies
,
353 to
->to_retries
? "retrans" : "timeout");
354 return to
->to_retries
-- > 0;
358 * Close down a transport socket
361 xprt_close(struct rpc_xprt
*xprt
)
363 struct socket
*sock
= xprt
->sock
;
364 struct sock
*sk
= xprt
->inet
;
372 sk
->user_data
= NULL
;
373 sk
->data_ready
= xprt
->old_data_ready
;
374 sk
->state_change
= xprt
->old_state_change
;
375 sk
->write_space
= xprt
->old_write_space
;
377 xprt_disconnect(xprt
);
382 * TCP doesnt require the rpciod now - other things may
383 * but rpciod handles that not us.
390 * Mark a transport as disconnected
393 xprt_disconnect(struct rpc_xprt
*xprt
)
395 dprintk("RPC: disconnected transport %p\n", xprt
);
397 xprt
->tcp_offset
= 0;
398 xprt
->tcp_copied
= 0;
400 xprt_remove_pending(xprt
);
401 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
405 * Reconnect a broken TCP connection.
408 xprt_reconnect(struct rpc_task
*task
)
410 struct rpc_xprt
*xprt
= task
->tk_xprt
;
411 struct socket
*sock
= xprt
->sock
;
412 struct sock
*inet
= xprt
->inet
;
415 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
416 task
->tk_pid
, xprt
, xprt
->connected
);
423 if (!xprt
->addr
.sin_port
) {
424 task
->tk_status
= -EIO
;
428 spin_lock(&xprt_lock
);
429 if (xprt
->connecting
) {
430 task
->tk_timeout
= 0;
431 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
432 spin_unlock(&xprt_lock
);
435 xprt
->connecting
= 1;
436 spin_unlock(&xprt_lock
);
440 /* Create an unconnected socket */
441 if (!(sock
= xprt_create_socket(xprt
->prot
, &xprt
->timeout
)))
443 xprt_bind_socket(xprt
, sock
);
447 xprt_disconnect(xprt
);
449 /* Now connect it asynchronously. */
450 dprintk("RPC: %4d connecting new socket\n", task
->tk_pid
);
451 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
452 sizeof(xprt
->addr
), O_NONBLOCK
);
466 printk("RPC: TCP connect error %d!\n", -status
);
471 dprintk("RPC: %4d connect status %d connected %d\n",
472 task
->tk_pid
, status
, xprt
->connected
);
474 spin_lock_bh(&xprt_sock_lock
);
475 if (!xprt
->connected
) {
476 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
477 rpc_sleep_on(&xprt
->reconn
, task
, xprt_reconn_status
, NULL
);
478 spin_unlock_bh(&xprt_sock_lock
);
481 spin_unlock_bh(&xprt_sock_lock
);
484 spin_lock(&xprt_lock
);
485 xprt
->connecting
= 0;
487 rpc_delay(task
, 5*HZ
);
488 task
->tk_status
= -ENOTCONN
;
490 rpc_wake_up(&xprt
->reconn
);
491 spin_unlock(&xprt_lock
);
495 * Reconnect timeout. We just mark the transport as not being in the
496 * process of reconnecting, and leave the rest to the upper layers.
499 xprt_reconn_status(struct rpc_task
*task
)
501 struct rpc_xprt
*xprt
= task
->tk_xprt
;
503 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
504 task
->tk_pid
, task
->tk_status
);
506 spin_lock(&xprt_lock
);
507 xprt
->connecting
= 0;
508 rpc_wake_up(&xprt
->reconn
);
509 spin_unlock(&xprt_lock
);
513 * Look up the RPC request corresponding to a reply, and then lock it.
515 static inline struct rpc_rqst
*
516 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
518 struct rpc_task
*head
, *task
;
519 struct rpc_rqst
*req
;
522 spin_lock_bh(&rpc_queue_lock
);
523 if ((head
= xprt
->pending
.task
) != NULL
) {
526 if ((req
= task
->tk_rqstp
) && req
->rq_xid
== xid
)
528 task
= task
->tk_next
;
530 printk("xprt_lookup_rqst: loop in Q!\n");
533 } while (task
!= head
);
535 dprintk("RPC: unknown XID %08x in reply.\n", xid
);
539 if (req
&& !rpc_lock_task(req
->rq_task
))
541 spin_unlock_bh(&rpc_queue_lock
);
546 * Complete reply received.
547 * The TCP code relies on us to remove the request from xprt->pending.
550 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
552 struct rpc_task
*task
= req
->rq_task
;
554 /* Adjust congestion window */
555 xprt_adjust_cwnd(xprt
, copied
);
558 /* Profile only reads for now */
560 static unsigned long nextstat
= 0;
561 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
564 pkt_len
+= req
->rq_slen
+ copied
;
565 pkt_rtt
+= jiffies
- req
->rq_xtime
;
566 if (time_before(nextstat
, jiffies
)) {
567 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
568 printk("RPC: %ld %ld %ld %ld stat\n",
569 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
570 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
571 nextstat
= jiffies
+ 5 * HZ
;
576 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
577 task
->tk_status
= copied
;
579 /* ... and wake up the process. */
580 rpc_wake_up_task(task
);
585 * We have set things up such that we perform the checksum of the UDP
586 * packet in parallel with the copies into the RPC client iovec. -DaveM
588 static int csum_partial_copy_to_page_cache(struct iovec
*iov
,
592 __u8
*pkt_data
= skb
->data
+ sizeof(struct udphdr
);
593 __u8
*cur_ptr
= iov
->iov_base
;
594 __kernel_size_t cur_len
= iov
->iov_len
;
595 unsigned int csum
= skb
->csum
;
596 int need_csum
= (skb
->ip_summed
!= CHECKSUM_UNNECESSARY
);
597 int slack
= skb
->len
- copied
- sizeof(struct udphdr
);
600 csum
= csum_partial(skb
->h
.raw
, sizeof(struct udphdr
), csum
);
603 int to_move
= cur_len
;
604 if (to_move
> copied
)
607 csum
= csum_partial_copy_nocheck(pkt_data
, cur_ptr
,
610 memcpy(cur_ptr
, pkt_data
, to_move
);
618 cur_len
= iov
->iov_len
;
619 cur_ptr
= iov
->iov_base
;
624 csum
= csum_partial(pkt_data
, slack
, csum
);
625 if ((unsigned short)csum_fold(csum
))
632 * Input handler for RPC replies. Called from a bottom half and hence
636 udp_data_ready(struct sock
*sk
, int len
)
638 struct rpc_task
*task
;
639 struct rpc_xprt
*xprt
;
640 struct rpc_rqst
*rovr
;
642 int err
, repsize
, copied
;
644 dprintk("RPC: udp_data_ready...\n");
645 if (!(xprt
= xprt_from_sock(sk
))) {
646 printk("RPC: udp_data_ready request not found!\n");
650 dprintk("RPC: udp_data_ready client %p\n", xprt
);
652 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
658 repsize
= skb
->len
- sizeof(struct udphdr
);
660 printk("RPC: impossible RPC reply size %d!\n", repsize
);
664 /* Look up and lock the request corresponding to the given XID */
665 rovr
= xprt_lookup_rqst(xprt
, *(u32
*) (skb
->h
.raw
+ sizeof(struct udphdr
)));
668 task
= rovr
->rq_task
;
670 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
671 xprt_pktdump("packet data:",
672 (u32
*) (skb
->h
.raw
+sizeof(struct udphdr
)), repsize
);
674 if ((copied
= rovr
->rq_rlen
) > repsize
)
677 rovr
->rq_damaged
= 1;
678 /* Suck it into the iovec, verify checksum if not done by hw. */
679 if (csum_partial_copy_to_page_cache(rovr
->rq_rvec
, skb
, copied
))
682 /* Something worked... */
683 dst_confirm(skb
->dst
);
685 xprt_complete_rqst(xprt
, rovr
, copied
);
688 rpc_unlock_task(task
);
691 skb_free_datagram(sk
, skb
);
695 * TCP read fragment marker
698 tcp_read_fraghdr(struct rpc_xprt
*xprt
)
703 if (xprt
->tcp_offset
>= xprt
->tcp_reclen
+ sizeof(xprt
->tcp_recm
)) {
704 xprt
->tcp_offset
= 0;
705 xprt
->tcp_reclen
= 0;
707 if (xprt
->tcp_offset
>= sizeof(xprt
->tcp_recm
))
710 want
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
711 dprintk("RPC: reading header (%d bytes)\n", want
);
713 riov
.iov_base
= ((u8
*) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
715 result
= xprt_recvmsg(xprt
, &riov
, 1, want
, 0);
718 xprt
->tcp_offset
+= result
;
722 /* Is this another fragment in the last message */
724 xprt
->tcp_copied
= 0; /* No, so we're reading a new message */
726 /* Get the record length and mask out the last fragment bit */
727 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
728 xprt
->tcp_more
= (xprt
->tcp_reclen
& 0x80000000) ? 0 : 1;
729 xprt
->tcp_reclen
&= 0x7fffffff;
731 dprintk("RPC: New record reclen %d morefrags %d\n",
732 xprt
->tcp_reclen
, xprt
->tcp_more
);
734 return xprt
->tcp_reclen
+ sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
741 tcp_read_xid(struct rpc_xprt
*xprt
, int avail
)
746 if (xprt
->tcp_copied
>= sizeof(xprt
->tcp_xid
) || !avail
)
748 want
= MIN(sizeof(xprt
->tcp_xid
) - xprt
->tcp_copied
, avail
);
750 dprintk("RPC: reading xid (%d bytes)\n", want
);
751 riov
.iov_base
= ((u8
*) &xprt
->tcp_xid
) + xprt
->tcp_copied
;
753 result
= xprt_recvmsg(xprt
, &riov
, 1, want
, 0);
756 xprt
->tcp_copied
+= result
;
757 xprt
->tcp_offset
+= result
;
766 * TCP read and complete request
769 tcp_read_request(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int avail
)
773 if (req
->rq_rlen
<= xprt
->tcp_copied
|| !avail
)
775 want
= MIN(req
->rq_rlen
- xprt
->tcp_copied
, avail
);
777 dprintk("RPC: %4d TCP receiving %d bytes\n",
778 req
->rq_task
->tk_pid
, want
);
780 result
= xprt_recvmsg(xprt
, req
->rq_rvec
, req
->rq_rnr
, want
, xprt
->tcp_copied
);
783 xprt
->tcp_copied
+= result
;
784 xprt
->tcp_offset
+= result
;
790 if (req
->rq_rlen
> xprt
->tcp_copied
&& xprt
->tcp_more
)
792 dprintk("RPC: %4d received reply complete\n", req
->rq_task
->tk_pid
);
793 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
799 * TCP discard extra bytes from a short read
802 tcp_read_discard(struct rpc_xprt
*xprt
, int avail
)
806 int want
, result
= 0;
809 want
= MIN(avail
, sizeof(dummy
));
810 riov
.iov_base
= dummy
;
812 dprintk("RPC: TCP skipping %d bytes\n", want
);
813 result
= xprt_recvmsg(xprt
, &riov
, 1, want
, 0);
816 xprt
->tcp_offset
+= result
;
823 * TCP record receive routine
824 * This is not the most efficient code since we call recvfrom thrice--
825 * first receiving the record marker, then the XID, then the data.
827 * The optimal solution would be a RPC support in the TCP layer, which
828 * would gather all data up to the next record marker and then pass us
829 * the list of all TCP segments ready to be copied.
832 tcp_input_record(struct rpc_xprt
*xprt
)
834 struct rpc_rqst
*req
= NULL
;
835 struct rpc_task
*task
= NULL
;
838 dprintk("RPC: tcp_input_record\n");
842 if (!xprt
->connected
)
845 /* Read in a new fragment marker if necessary */
846 /* Can we ever really expect to get completely empty fragments? */
847 if ((result
= tcp_read_fraghdr(xprt
)) <= 0)
851 /* Read in the xid if necessary */
852 if ((result
= tcp_read_xid(xprt
, avail
)) <= 0)
856 /* Find and lock the request corresponding to this xid */
857 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
860 if (xprt
->tcp_copied
== sizeof(xprt
->tcp_xid
) || req
->rq_damaged
) {
862 /* Read in the request data */
863 result
= tcp_read_request(xprt
, req
, avail
);
865 rpc_unlock_task(task
);
871 /* Skip over any trailing bytes on short reads */
872 if ((result
= tcp_read_discard(xprt
, avail
)) < 0)
875 dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
876 xprt
->tcp_offset
, xprt
->tcp_reclen
, xprt
->tcp_copied
);
877 result
= xprt
->tcp_reclen
;
882 * TCP task queue stuff
884 LIST_HEAD(rpc_xprt_pending
); /* List of xprts having pending tcp requests */
887 void tcp_rpciod_queue(void)
893 void xprt_append_pending(struct rpc_xprt
*xprt
)
895 if (!list_empty(&xprt
->rx_pending
))
897 spin_lock_bh(&rpc_queue_lock
);
898 if (list_empty(&xprt
->rx_pending
)) {
899 list_add(&xprt
->rx_pending
, rpc_xprt_pending
.prev
);
900 dprintk("RPC: xprt queue %p\n", xprt
);
903 spin_unlock_bh(&rpc_queue_lock
);
907 void xprt_remove_pending(struct rpc_xprt
*xprt
)
909 spin_lock_bh(&rpc_queue_lock
);
910 if (!list_empty(&xprt
->rx_pending
)) {
911 list_del(&xprt
->rx_pending
);
912 INIT_LIST_HEAD(&xprt
->rx_pending
);
914 spin_unlock_bh(&rpc_queue_lock
);
918 struct rpc_xprt
*xprt_remove_pending_next(void)
920 struct rpc_xprt
*xprt
= NULL
;
922 spin_lock_bh(&rpc_queue_lock
);
923 if (!list_empty(&rpc_xprt_pending
)) {
924 xprt
= list_entry(rpc_xprt_pending
.next
, struct rpc_xprt
, rx_pending
);
925 list_del(&xprt
->rx_pending
);
926 INIT_LIST_HEAD(&xprt
->rx_pending
);
928 spin_unlock_bh(&rpc_queue_lock
);
933 * This is protected from tcp_data_ready and the stack as its run
934 * inside of the RPC I/O daemon
937 __rpciod_tcp_dispatcher(void)
939 struct rpc_xprt
*xprt
;
940 int safe_retry
= 0, result
;
942 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
945 * Empty each pending socket
947 while ((xprt
= xprt_remove_pending_next()) != NULL
) {
948 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt
);
951 result
= tcp_input_record(xprt
);
952 } while (result
>= 0);
954 if (safe_retry
++ > 200) {
962 * data_ready callback for TCP. We can't just jump into the
963 * tcp recvmsg functions inside of the network receive bh or
964 * bad things occur. We queue it to pick up after networking
968 static void tcp_data_ready(struct sock
*sk
, int len
)
970 struct rpc_xprt
*xprt
;
972 dprintk("RPC: tcp_data_ready...\n");
973 if (!(xprt
= xprt_from_sock(sk
)))
975 printk("Not a socket with xprt %p\n", sk
);
982 xprt_append_pending(xprt
);
984 dprintk("RPC: tcp_data_ready client %p\n", xprt
);
985 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
986 sk
->state
, xprt
->connected
,
987 sk
->dead
, sk
->zapped
);
992 tcp_state_change(struct sock
*sk
)
994 struct rpc_xprt
*xprt
;
996 if (!(xprt
= xprt_from_sock(sk
)))
998 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
999 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1000 sk
->state
, xprt
->connected
,
1001 sk
->dead
, sk
->zapped
);
1003 spin_lock_bh(&xprt_sock_lock
);
1004 switch (sk
->state
) {
1005 case TCP_ESTABLISHED
:
1006 xprt
->connected
= 1;
1007 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->sending
)
1008 rpc_wake_up_task(xprt
->snd_task
);
1009 rpc_wake_up(&xprt
->reconn
);
1012 xprt
->connected
= 0;
1013 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
1016 spin_unlock_bh(&xprt_sock_lock
);
1020 * The following 2 routines allow a task to sleep while socket memory is
1024 tcp_write_space(struct sock
*sk
)
1026 struct rpc_xprt
*xprt
;
1028 if (!(xprt
= xprt_from_sock(sk
)))
1033 /* Wait until we have enough socket memory */
1034 if (sock_wspace(sk
) < min(sk
->sndbuf
,XPRT_MIN_WRITE_SPACE
))
1037 spin_lock_bh(&xprt_sock_lock
);
1038 if (xprt
->write_space
)
1041 xprt
->write_space
= 1;
1043 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->sending
)
1044 rpc_wake_up_task(xprt
->snd_task
);
1046 spin_unlock_bh(&xprt_sock_lock
);
1050 udp_write_space(struct sock
*sk
)
1052 struct rpc_xprt
*xprt
;
1054 if (!(xprt
= xprt_from_sock(sk
)))
1060 /* Wait until we have enough socket memory */
1061 if (sock_wspace(sk
) < min(sk
->sndbuf
,XPRT_MIN_WRITE_SPACE
))
1064 spin_lock_bh(&xprt_sock_lock
);
1065 if (xprt
->write_space
)
1068 xprt
->write_space
= 1;
1070 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->sending
)
1071 rpc_wake_up_task(xprt
->snd_task
);
1073 spin_unlock_bh(&xprt_sock_lock
);
1077 * RPC receive timeout handler.
1080 xprt_timer(struct rpc_task
*task
)
1082 struct rpc_rqst
*req
= task
->tk_rqstp
;
1085 xprt_adjust_cwnd(task
->tk_xprt
, -ETIMEDOUT
);
1087 dprintk("RPC: %4d xprt_timer (%s request)\n",
1088 task
->tk_pid
, req
? "pending" : "backlogged");
1090 task
->tk_status
= -ETIMEDOUT
;
1091 task
->tk_timeout
= 0;
1092 rpc_wake_up_task(task
);
1097 * Serialize access to sockets, in order to prevent different
1098 * requests from interfering with each other.
1101 xprt_down_transmit(struct rpc_task
*task
)
1103 struct rpc_xprt
*xprt
= task
->tk_rqstp
->rq_xprt
;
1104 struct rpc_rqst
*req
= task
->tk_rqstp
;
1106 spin_lock(&xprt_lock
);
1107 if (xprt
->snd_task
&& xprt
->snd_task
!= task
) {
1108 dprintk("RPC: %4d TCP write queue full (task %d)\n",
1109 task
->tk_pid
, xprt
->snd_task
->tk_pid
);
1110 task
->tk_timeout
= 0;
1111 task
->tk_status
= -EAGAIN
;
1112 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1113 } else if (!xprt
->snd_task
) {
1114 xprt
->snd_task
= task
;
1116 req
->rq_xtime
= jiffies
;
1118 req
->rq_bytes_sent
= 0;
1120 spin_unlock(&xprt_lock
);
1121 return xprt
->snd_task
== task
;
1125 * Releases the socket for use by other requests.
1128 xprt_up_transmit(struct rpc_task
*task
)
1130 struct rpc_xprt
*xprt
= task
->tk_rqstp
->rq_xprt
;
1132 if (xprt
->snd_task
&& xprt
->snd_task
== task
) {
1133 spin_lock(&xprt_lock
);
1134 xprt
->snd_task
= NULL
;
1135 rpc_wake_up_next(&xprt
->sending
);
1136 spin_unlock(&xprt_lock
);
1141 * Place the actual RPC call.
1142 * We have to copy the iovec because sendmsg fiddles with its contents.
1145 xprt_transmit(struct rpc_task
*task
)
1147 struct rpc_rqst
*req
= task
->tk_rqstp
;
1148 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1150 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
1151 *(u32
*)(req
->rq_svec
[0].iov_base
));
1154 task
->tk_status
= -EIO
;
1156 if (!xprt
->connected
)
1157 task
->tk_status
= -ENOTCONN
;
1159 if (task
->tk_status
< 0)
1162 if (task
->tk_rpcwait
)
1163 rpc_remove_wait_queue(task
);
1165 /* set up everything as needed. */
1166 /* Write the record marker */
1168 u32
*marker
= req
->rq_svec
[0].iov_base
;
1170 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1173 if (!xprt_down_transmit(task
))
1176 do_xprt_transmit(task
);
1180 do_xprt_transmit(struct rpc_task
*task
)
1182 struct rpc_rqst
*req
= task
->tk_rqstp
;
1183 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1184 int status
, retry
= 0;
1187 /* For fast networks/servers we have to put the request on
1188 * the pending list now:
1189 * Note that we don't want the task timing out during the
1190 * call to xprt_sendmsg(), so we initially disable the timeout,
1191 * and then reset it later...
1195 /* Continue transmitting the packet/record. We must be careful
1196 * to cope with writespace callbacks arriving _after_ we have
1197 * called xprt_sendmsg().
1200 xprt
->write_space
= 0;
1201 status
= xprt_sendmsg(xprt
, req
);
1207 req
->rq_bytes_sent
+= status
;
1209 if (req
->rq_bytes_sent
>= req
->rq_slen
)
1212 if (status
>= req
->rq_slen
)
1218 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1219 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1226 rpc_unlock_task(task
);
1228 task
->tk_status
= status
;
1230 /* Note: at this point, task->tk_sleeping has not yet been set,
1231 * hence there is no danger of the waking up task being put on
1232 * schedq, and being picked up by a parallel run of rpciod().
1234 rpc_wake_up_task(task
);
1235 if (!RPC_IS_RUNNING(task
))
1240 /* Protect against (udp|tcp)_write_space */
1241 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1242 spin_lock_bh(&xprt_sock_lock
);
1243 if (!xprt
->write_space
)
1244 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1245 spin_unlock_bh(&xprt_sock_lock
);
1248 /* Keep holding the socket if it is blocked */
1249 rpc_delay(task
, HZ
>>4);
1260 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1261 /* Set the task's receive timeout value */
1262 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1263 rpc_add_timer(task
, xprt_timer
);
1264 rpc_unlock_task(task
);
1266 xprt_up_transmit(task
);
1270 * Queue the task for a reply to our call.
1271 * When the callback is invoked, the congestion window should have
1272 * been updated already.
1275 xprt_receive(struct rpc_task
*task
)
1277 struct rpc_rqst
*req
= task
->tk_rqstp
;
1278 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1280 dprintk("RPC: %4d xprt_receive\n", task
->tk_pid
);
1282 task
->tk_timeout
= 0;
1283 rpc_sleep_locked(&xprt
->pending
, task
, NULL
, NULL
);
1287 * Reserve an RPC call slot.
1290 xprt_reserve(struct rpc_task
*task
)
1292 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1294 /* We already have an initialized request. */
1298 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1299 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1300 spin_lock_bh(&xprt_sock_lock
);
1301 xprt_reserve_status(task
);
1302 if (task
->tk_rqstp
) {
1303 task
->tk_timeout
= 0;
1304 } else if (!task
->tk_timeout
) {
1305 task
->tk_status
= -ENOBUFS
;
1307 dprintk("RPC: xprt_reserve waiting on backlog\n");
1308 task
->tk_status
= -EAGAIN
;
1309 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1311 spin_unlock_bh(&xprt_sock_lock
);
1312 dprintk("RPC: %4d xprt_reserve returns %d\n",
1313 task
->tk_pid
, task
->tk_status
);
1314 return task
->tk_status
;
1318 * Reservation callback
1321 xprt_reserve_status(struct rpc_task
*task
)
1323 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1324 struct rpc_rqst
*req
;
1326 if (xprt
->shutdown
) {
1327 task
->tk_status
= -EIO
;
1328 } else if (task
->tk_status
< 0) {
1330 } else if (task
->tk_rqstp
) {
1331 /* We've already been given a request slot: NOP */
1333 if (RPCXPRT_CONGESTED(xprt
) || !(req
= xprt
->free
))
1335 /* OK: There's room for us. Grab a free slot and bump
1336 * congestion value */
1337 xprt
->free
= req
->rq_next
;
1338 req
->rq_next
= NULL
;
1339 xprt
->cong
+= RPC_CWNDSCALE
;
1340 task
->tk_rqstp
= req
;
1341 xprt_request_init(task
, xprt
);
1344 xprt_clear_backlog(xprt
);
1350 task
->tk_status
= -EAGAIN
;
1354 * Initialize RPC request
1357 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1359 struct rpc_rqst
*req
= task
->tk_rqstp
;
1363 xid
= CURRENT_TIME
<< 12;
1365 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1366 task
->tk_status
= 0;
1367 req
->rq_timeout
= xprt
->timeout
;
1368 req
->rq_task
= task
;
1369 req
->rq_xprt
= xprt
;
1370 req
->rq_xid
= xid
++;
1376 * Release an RPC call slot
1379 xprt_release(struct rpc_task
*task
)
1381 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1382 struct rpc_rqst
*req
;
1384 xprt_up_transmit(task
);
1385 if (!(req
= task
->tk_rqstp
))
1387 task
->tk_rqstp
= NULL
;
1388 memset(req
, 0, sizeof(*req
)); /* mark unused */
1390 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1392 /* remove slot from queue of pending */
1393 if (task
->tk_rpcwait
) {
1394 printk("RPC: task of released request still queued!\n");
1395 rpc_remove_wait_queue(task
);
1398 spin_lock_bh(&xprt_sock_lock
);
1399 req
->rq_next
= xprt
->free
;
1402 /* Decrease congestion value. */
1403 xprt
->cong
-= RPC_CWNDSCALE
;
1405 xprt_clear_backlog(xprt
);
1406 spin_unlock_bh(&xprt_sock_lock
);
1410 * Set default timeout parameters
1413 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1415 if (proto
== IPPROTO_UDP
)
1416 xprt_set_timeout(to
, 5, 5 * HZ
);
1418 xprt_set_timeout(to
, 5, 60 * HZ
);
1422 * Set constant timeout
1425 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1429 to
->to_increment
= incr
;
1430 to
->to_maxval
= incr
* retr
;
1431 to
->to_resrvval
= incr
* retr
;
1432 to
->to_retries
= retr
;
1433 to
->to_exponential
= 0;
1437 * Initialize an RPC client
1439 static struct rpc_xprt
*
1440 xprt_setup(struct socket
*sock
, int proto
,
1441 struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1443 struct rpc_xprt
*xprt
;
1444 struct rpc_rqst
*req
;
1447 dprintk("RPC: setting up %s transport...\n",
1448 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1450 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1452 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1456 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1458 xprt
->cwnd
= RPC_MAXCWND
;
1461 xprt
->cwnd
= RPC_INITCWND
;
1462 xprt
->congtime
= jiffies
;
1463 init_waitqueue_head(&xprt
->cong_wait
);
1465 /* Set timeout parameters */
1467 xprt
->timeout
= *to
;
1468 xprt
->timeout
.to_current
= to
->to_initval
;
1469 xprt
->timeout
.to_resrvval
= to
->to_maxval
<< 1;
1471 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1473 xprt
->pending
= RPC_INIT_WAITQ("xprt_pending");
1474 xprt
->sending
= RPC_INIT_WAITQ("xprt_sending");
1475 xprt
->backlog
= RPC_INIT_WAITQ("xprt_backlog");
1476 xprt
->reconn
= RPC_INIT_WAITQ("xprt_reconn");
1478 /* initialize free list */
1479 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1480 req
->rq_next
= req
+ 1;
1481 req
->rq_next
= NULL
;
1482 xprt
->free
= xprt
->slot
;
1484 INIT_LIST_HEAD(&xprt
->rx_pending
);
1486 dprintk("RPC: created transport %p\n", xprt
);
1488 xprt_bind_socket(xprt
, sock
);
1493 * Bind to a reserved port
1496 xprt_bindresvport(struct socket
*sock
)
1498 struct sockaddr_in myaddr
;
1501 memset(&myaddr
, 0, sizeof(myaddr
));
1502 myaddr
.sin_family
= AF_INET
;
1505 myaddr
.sin_port
= htons(port
);
1506 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1508 } while (err
== -EADDRINUSE
&& --port
> 0);
1511 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1517 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1519 struct sock
*sk
= sock
->sk
;
1524 sk
->user_data
= xprt
;
1525 xprt
->old_data_ready
= sk
->data_ready
;
1526 xprt
->old_state_change
= sk
->state_change
;
1527 xprt
->old_write_space
= sk
->write_space
;
1528 if (xprt
->prot
== IPPROTO_UDP
) {
1529 sk
->data_ready
= udp_data_ready
;
1530 sk
->write_space
= udp_write_space
;
1531 sk
->no_check
= UDP_CSUM_NORCV
;
1532 xprt
->connected
= 1;
1534 sk
->data_ready
= tcp_data_ready
;
1535 sk
->state_change
= tcp_state_change
;
1536 sk
->write_space
= tcp_write_space
;
1537 xprt
->connected
= 0;
1540 /* Reset to new socket */
1544 * TCP requires the rpc I/O daemon is present
1553 * Create a client socket given the protocol and peer address.
1555 static struct socket
*
1556 xprt_create_socket(int proto
, struct rpc_timeout
*to
)
1558 struct socket
*sock
;
1561 dprintk("RPC: xprt_create_socket(%s %d)\n",
1562 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1564 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1566 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1567 printk("RPC: can't create socket (%d).\n", -err
);
1571 /* If the caller has the capability, bind to a reserved port */
1572 if (capable(CAP_NET_BIND_SERVICE
) && xprt_bindresvport(sock
) < 0)
1583 * Create an RPC client transport given the protocol and peer address.
1586 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1588 struct socket
*sock
;
1589 struct rpc_xprt
*xprt
;
1591 dprintk("RPC: xprt_create_proto called\n");
1593 if (!(sock
= xprt_create_socket(proto
, to
)))
1596 if (!(xprt
= xprt_setup(sock
, proto
, sap
, to
)))
1603 * Prepare for transport shutdown.
1606 xprt_shutdown(struct rpc_xprt
*xprt
)
1609 rpc_wake_up(&xprt
->sending
);
1610 rpc_wake_up(&xprt
->pending
);
1611 rpc_wake_up(&xprt
->backlog
);
1612 rpc_wake_up(&xprt
->reconn
);
1613 wake_up(&xprt
->cong_wait
);
1617 * Clear the xprt backlog queue
1620 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1621 if (RPCXPRT_CONGESTED(xprt
))
1623 rpc_wake_up_next(&xprt
->backlog
);
1624 wake_up(&xprt
->cong_wait
);
1629 * Destroy an RPC transport, killing off all requests.
1632 xprt_destroy(struct rpc_xprt
*xprt
)
1634 dprintk("RPC: destroying transport %p\n", xprt
);
1635 xprt_shutdown(xprt
);