2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #define __KERNEL_SYSCALLS__
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
56 #include <linux/net.h>
58 #include <linux/udp.h>
59 #include <linux/tcp.h>
60 #include <linux/unistd.h>
61 #include <linux/sunrpc/clnt.h>
62 #include <linux/file.h>
65 #include <net/checksum.h>
69 #include <asm/uaccess.h>
76 # undef RPC_DEBUG_DATA
77 # define RPCDBG_FACILITY RPCDBG_XPRT
80 #define XPRT_MAX_BACKOFF (8)
85 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
86 static void do_xprt_transmit(struct rpc_task
*);
87 static inline void do_xprt_reserve(struct rpc_task
*);
88 static void xprt_disconnect(struct rpc_xprt
*);
89 static void xprt_conn_status(struct rpc_task
*task
);
90 static struct rpc_xprt
* xprt_setup(int proto
, struct sockaddr_in
*ap
,
91 struct rpc_timeout
*to
);
92 static struct socket
*xprt_create_socket(int, struct rpc_timeout
*, int);
93 static void xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
94 static int __xprt_get_cong(struct rpc_xprt
*, struct rpc_task
*);
98 * Print the buffer contents (first 128 bytes only--just enough for
102 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
104 u8
*buf
= (u8
*) packet
;
107 dprintk("RPC: %s\n", msg
);
108 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
112 dprintk("0x%04x ", j
);
114 dprintk("%02x%02x%02x%02x ",
115 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
121 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
128 * Look up RPC transport given an INET socket
130 static inline struct rpc_xprt
*
131 xprt_from_sock(struct sock
*sk
)
133 return (struct rpc_xprt
*) sk
->user_data
;
137 * Serialize write access to sockets, in order to prevent different
138 * requests from interfering with each other.
139 * Also prevents TCP socket connects from colliding with writes.
142 __xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
144 if (!xprt
->snd_task
) {
145 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
))
146 xprt
->snd_task
= task
;
148 if (xprt
->snd_task
!= task
) {
149 dprintk("RPC: %4d TCP write queue full\n", task
->tk_pid
);
150 task
->tk_timeout
= 0;
151 task
->tk_status
= -EAGAIN
;
152 if (task
->tk_rqstp
&& task
->tk_rqstp
->rq_nresend
)
153 rpc_sleep_on(&xprt
->resend
, task
, NULL
, NULL
);
155 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
157 return xprt
->snd_task
== task
;
161 xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
165 spin_lock_bh(&xprt
->sock_lock
);
166 retval
= __xprt_lock_write(xprt
, task
);
167 spin_unlock_bh(&xprt
->sock_lock
);
173 __xprt_lock_write_next(struct rpc_xprt
*xprt
)
175 struct rpc_task
*task
;
179 if (!xprt
->nocong
&& RPCXPRT_CONGESTED(xprt
))
181 task
= rpc_wake_up_next(&xprt
->resend
);
183 task
= rpc_wake_up_next(&xprt
->sending
);
187 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
))
188 xprt
->snd_task
= task
;
192 * Releases the socket for use by other requests.
195 __xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
197 if (xprt
->snd_task
== task
)
198 xprt
->snd_task
= NULL
;
199 __xprt_lock_write_next(xprt
);
203 xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
205 spin_lock_bh(&xprt
->sock_lock
);
206 __xprt_release_write(xprt
, task
);
207 spin_unlock_bh(&xprt
->sock_lock
);
211 * Write data to socket.
214 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
216 struct socket
*sock
= xprt
->sock
;
218 struct xdr_buf
*xdr
= &req
->rq_snd_buf
;
219 struct iovec niv
[MAX_IOVEC
];
220 unsigned int niov
, slen
, skip
;
227 xprt_pktdump("packet data:",
228 req
->rq_svec
->iov_base
,
229 req
->rq_svec
->iov_len
);
231 /* Dont repeat bytes */
232 skip
= req
->rq_bytes_sent
;
233 slen
= xdr
->len
- skip
;
234 niov
= xdr_kmap(niv
, xdr
, skip
);
236 msg
.msg_flags
= MSG_DONTWAIT
|MSG_NOSIGNAL
;
238 msg
.msg_iovlen
= niov
;
239 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
240 msg
.msg_namelen
= sizeof(xprt
->addr
);
241 msg
.msg_control
= NULL
;
242 msg
.msg_controllen
= 0;
244 oldfs
= get_fs(); set_fs(get_ds());
245 clear_bit(SOCK_ASYNC_NOSPACE
, &sock
->flags
);
246 result
= sock_sendmsg(sock
, &msg
, slen
);
249 xdr_kunmap(xdr
, skip
);
251 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen
, result
);
258 /* When the server has died, an ICMP port unreachable message
259 * prompts ECONNREFUSED.
265 /* connection broken */
270 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
276 * Van Jacobson congestion avoidance. Check if the congestion window
277 * overflowed. Put the task to sleep if this is the case.
280 __xprt_get_cong(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
282 struct rpc_rqst
*req
= task
->tk_rqstp
;
286 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
287 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
288 if (RPCXPRT_CONGESTED(xprt
))
291 xprt
->cong
+= RPC_CWNDSCALE
;
296 * Adjust the congestion window, and wake up the next task
297 * that has been sleeping due to congestion
300 __xprt_put_cong(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
305 xprt
->cong
-= RPC_CWNDSCALE
;
306 __xprt_lock_write_next(xprt
);
310 * Adjust RPC congestion window
311 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
314 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
319 if (result
>= 0 && cwnd
<= xprt
->cong
) {
320 /* The (cwnd >> 1) term makes sure
321 * the result gets rounded properly. */
322 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
323 if (cwnd
> RPC_MAXCWND
)
325 __xprt_lock_write_next(xprt
);
326 } else if (result
== -ETIMEDOUT
) {
328 if (cwnd
< RPC_CWNDSCALE
)
329 cwnd
= RPC_CWNDSCALE
;
331 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
332 xprt
->cong
, xprt
->cwnd
, cwnd
);
337 * Adjust timeout values etc for next retransmit
340 xprt_adjust_timeout(struct rpc_timeout
*to
)
342 if (to
->to_retries
> 0) {
343 if (to
->to_exponential
)
344 to
->to_current
<<= 1;
346 to
->to_current
+= to
->to_increment
;
347 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
)
348 to
->to_current
= to
->to_maxval
;
350 if (to
->to_exponential
)
351 to
->to_initval
<<= 1;
353 to
->to_initval
+= to
->to_increment
;
354 if (to
->to_maxval
&& to
->to_initval
>= to
->to_maxval
)
355 to
->to_initval
= to
->to_maxval
;
356 to
->to_current
= to
->to_initval
;
359 if (!to
->to_current
) {
360 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
361 to
->to_current
= 5 * HZ
;
363 pprintk("RPC: %lu %s\n", jiffies
,
364 to
->to_retries
? "retrans" : "timeout");
365 return to
->to_retries
-- > 0;
369 * Close down a transport socket
372 xprt_close(struct rpc_xprt
*xprt
)
374 struct socket
*sock
= xprt
->sock
;
375 struct sock
*sk
= xprt
->inet
;
383 sk
->user_data
= NULL
;
384 sk
->data_ready
= xprt
->old_data_ready
;
385 sk
->state_change
= xprt
->old_state_change
;
386 sk
->write_space
= xprt
->old_write_space
;
388 xprt_disconnect(xprt
);
395 * Mark a transport as disconnected
398 xprt_disconnect(struct rpc_xprt
*xprt
)
400 dprintk("RPC: disconnected transport %p\n", xprt
);
401 xprt_clear_connected(xprt
);
402 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
406 * Attempt to connect a TCP socket.
408 * NB: This never collides with TCP reads, as both run from rpciod
411 xprt_connect(struct rpc_task
*task
)
413 struct rpc_xprt
*xprt
= task
->tk_xprt
;
414 struct socket
*sock
= xprt
->sock
;
418 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task
->tk_pid
,
419 xprt
, (xprt_connected(xprt
) ? "is" : "is not"));
421 if (xprt
->shutdown
) {
422 task
->tk_status
= -EIO
;
425 if (!xprt
->addr
.sin_port
) {
426 task
->tk_status
= -EIO
;
429 if (!xprt_lock_write(xprt
, task
))
431 if (xprt_connected(xprt
))
435 * We're here because the xprt was marked disconnected.
436 * Start by resetting any existing state.
439 if (!(sock
= xprt_create_socket(xprt
->prot
, &xprt
->timeout
, xprt
->resvport
))) {
440 /* couldn't create socket or bind to reserved port;
441 * this is likely a permanent error, so cause an abort */
442 task
->tk_status
= -EIO
;
445 xprt_bind_socket(xprt
, sock
);
449 * Tell the socket layer to start connecting...
451 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
452 sizeof(xprt
->addr
), O_NONBLOCK
);
453 dprintk("RPC: %4d connect status %d connected %d sock state %d\n",
454 task
->tk_pid
, -status
, xprt_connected(xprt
), inet
->state
);
459 /* Protect against TCP socket state changes */
461 if (inet
->state
!= TCP_ESTABLISHED
) {
462 dprintk("RPC: %4d waiting for connection\n",
464 task
->tk_timeout
= RPC_CONNECT_TIMEOUT
;
465 /* if the socket is already closing, delay briefly */
466 if ((1 << inet
->state
) & ~(TCPF_SYN_SENT
|TCPF_SYN_RECV
))
467 task
->tk_timeout
= RPC_REESTABLISH_TIMEOUT
;
468 rpc_sleep_on(&xprt
->pending
, task
, xprt_conn_status
,
471 /* task status set when task wakes up again */
479 case -EISCONN
: /* not likely, but just in case */
480 /* Half closed state. No race -- this socket is dead. */
481 if (inet
->state
!= TCP_ESTABLISHED
) {
483 task
->tk_status
= -EAGAIN
;
487 /* Otherwise, the connection is already established. */
493 task
->tk_status
= -ENOTCONN
;
497 /* Report myriad other possible returns. If this file
498 * system is soft mounted, just error out, like Solaris. */
500 if (task
->tk_client
->cl_softrtry
) {
502 "RPC: error %d connecting to server %s, exiting\n",
503 -status
, task
->tk_client
->cl_server
);
504 task
->tk_status
= -EIO
;
507 "RPC: error %d connecting to server %s\n",
508 -status
, task
->tk_client
->cl_server
);
509 rpc_delay(task
, RPC_REESTABLISH_TIMEOUT
);
510 task
->tk_status
= status
;
516 xprt_release_write(xprt
, task
);
520 * We arrive here when awoken from waiting on connection establishment.
523 xprt_conn_status(struct rpc_task
*task
)
525 struct rpc_xprt
*xprt
= task
->tk_xprt
;
527 switch (task
->tk_status
) {
529 dprintk("RPC: %4d xprt_conn_status: connection established\n",
533 dprintk("RPC: %4d xprt_conn_status: timed out\n",
535 /* prevent TCP from continuing to retry SYNs */
539 printk(KERN_ERR
"RPC: error %d connecting to server %s\n",
540 -task
->tk_status
, task
->tk_client
->cl_server
);
542 rpc_delay(task
, RPC_REESTABLISH_TIMEOUT
);
545 /* if soft mounted, cause this RPC to fail */
546 if (task
->tk_client
->cl_softrtry
)
547 task
->tk_status
= -EIO
;
550 xprt_release_write(xprt
, task
);
554 * Look up the RPC request corresponding to a reply, and then lock it.
556 static inline struct rpc_rqst
*
557 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
559 struct list_head
*pos
;
560 struct rpc_rqst
*req
= NULL
;
562 list_for_each(pos
, &xprt
->recv
) {
563 struct rpc_rqst
*entry
= list_entry(pos
, struct rpc_rqst
, rq_list
);
564 if (entry
->rq_xid
== xid
) {
573 * Complete reply received.
574 * The TCP code relies on us to remove the request from xprt->pending.
577 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
579 struct rpc_task
*task
= req
->rq_task
;
580 struct rpc_clnt
*clnt
= task
->tk_client
;
582 /* Adjust congestion window */
584 xprt_adjust_cwnd(xprt
, copied
);
585 __xprt_put_cong(xprt
, req
);
586 if (!req
->rq_nresend
) {
588 task
->tk_msg
.rpc_proc
->p_timer
;
590 rpc_update_rtt(&clnt
->cl_rtt
, timer
,
591 (long)jiffies
- req
->rq_xtime
);
593 rpc_clear_timeo(&clnt
->cl_rtt
);
597 /* Profile only reads for now */
599 static unsigned long nextstat
= 0;
600 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
603 pkt_len
+= req
->rq_slen
+ copied
;
604 pkt_rtt
+= jiffies
- req
->rq_xtime
;
605 if (time_before(nextstat
, jiffies
)) {
606 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
607 printk("RPC: %ld %ld %ld %ld stat\n",
608 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
609 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
610 nextstat
= jiffies
+ 5 * HZ
;
615 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
616 req
->rq_received
= copied
;
617 list_del_init(&req
->rq_list
);
619 /* ... and wake up the process. */
620 rpc_wake_up_task(task
);
625 skb_read_bits(skb_reader_t
*desc
, void *to
, size_t len
)
627 if (len
> desc
->count
)
629 skb_copy_bits(desc
->skb
, desc
->offset
, to
, len
);
636 skb_read_and_csum_bits(skb_reader_t
*desc
, void *to
, size_t len
)
638 unsigned int csum2
, pos
;
640 if (len
> desc
->count
)
643 csum2
= skb_copy_and_csum_bits(desc
->skb
, pos
, to
, len
, 0);
644 desc
->csum
= csum_block_add(desc
->csum
, csum2
, pos
);
651 * We have set things up such that we perform the checksum of the UDP
652 * packet in parallel with the copies into the RPC client iovec. -DaveM
655 csum_partial_copy_to_xdr(struct xdr_buf
*xdr
, struct sk_buff
*skb
)
660 desc
.offset
= sizeof(struct udphdr
);
661 desc
.count
= skb
->len
- desc
.offset
;
663 if (skb
->ip_summed
== CHECKSUM_UNNECESSARY
)
666 desc
.csum
= csum_partial(skb
->data
, desc
.offset
, skb
->csum
);
667 xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_and_csum_bits
);
668 if (desc
.offset
!= skb
->len
) {
670 csum2
= skb_checksum(skb
, desc
.offset
, skb
->len
- desc
.offset
, 0);
671 desc
.csum
= csum_block_add(desc
.csum
, csum2
, desc
.offset
);
673 if ((unsigned short)csum_fold(desc
.csum
))
677 xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_bits
);
682 * Input handler for RPC replies. Called from a bottom half and hence
686 udp_data_ready(struct sock
*sk
, int len
)
688 struct rpc_task
*task
;
689 struct rpc_xprt
*xprt
;
690 struct rpc_rqst
*rovr
;
692 int err
, repsize
, copied
;
694 dprintk("RPC: udp_data_ready...\n");
695 if (!(xprt
= xprt_from_sock(sk
))) {
696 printk("RPC: udp_data_ready request not found!\n");
700 dprintk("RPC: udp_data_ready client %p\n", xprt
);
702 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
708 repsize
= skb
->len
- sizeof(struct udphdr
);
710 printk("RPC: impossible RPC reply size %d!\n", repsize
);
714 /* Look up and lock the request corresponding to the given XID */
715 spin_lock(&xprt
->sock_lock
);
716 rovr
= xprt_lookup_rqst(xprt
, *(u32
*) (skb
->h
.raw
+ sizeof(struct udphdr
)));
719 task
= rovr
->rq_task
;
721 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
722 xprt_pktdump("packet data:",
723 (u32
*) (skb
->h
.raw
+sizeof(struct udphdr
)), repsize
);
725 if ((copied
= rovr
->rq_rlen
) > repsize
)
728 /* Suck it into the iovec, verify checksum if not done by hw. */
729 if (csum_partial_copy_to_xdr(&rovr
->rq_rcv_buf
, skb
))
732 /* Something worked... */
733 dst_confirm(skb
->dst
);
735 xprt_complete_rqst(xprt
, rovr
, copied
);
738 spin_unlock(&xprt
->sock_lock
);
740 skb_free_datagram(sk
, skb
);
742 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
743 wake_up_interruptible(sk
->sleep
);
747 * Copy from an skb into memory and shrink the skb.
750 tcp_copy_data(skb_reader_t
*desc
, void *p
, size_t len
)
752 if (len
> desc
->count
)
754 skb_copy_bits(desc
->skb
, desc
->offset
, p
, len
);
761 * TCP read fragment marker
764 tcp_read_fraghdr(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
769 p
= ((char *) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
770 len
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
771 used
= tcp_copy_data(desc
, p
, len
);
772 xprt
->tcp_offset
+= used
;
775 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
776 if (xprt
->tcp_reclen
& 0x80000000)
777 xprt
->tcp_flags
|= XPRT_LAST_FRAG
;
779 xprt
->tcp_flags
&= ~XPRT_LAST_FRAG
;
780 xprt
->tcp_reclen
&= 0x7fffffff;
781 xprt
->tcp_flags
&= ~XPRT_COPY_RECM
;
782 xprt
->tcp_offset
= 0;
783 /* Sanity check of the record length */
784 if (xprt
->tcp_reclen
< 4) {
785 printk(KERN_ERR
"RPC: Invalid TCP record fragment length\n");
786 xprt_disconnect(xprt
);
788 dprintk("RPC: reading TCP record fragment of length %d\n",
793 tcp_check_recm(struct rpc_xprt
*xprt
)
795 if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
796 xprt
->tcp_flags
|= XPRT_COPY_RECM
;
797 xprt
->tcp_offset
= 0;
798 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
) {
799 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
800 xprt
->tcp_flags
|= XPRT_COPY_XID
;
801 xprt
->tcp_copied
= 0;
810 tcp_read_xid(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
815 len
= sizeof(xprt
->tcp_xid
) - xprt
->tcp_offset
;
816 dprintk("RPC: reading XID (%Zu bytes)\n", len
);
817 p
= ((char *) &xprt
->tcp_xid
) + xprt
->tcp_offset
;
818 used
= tcp_copy_data(desc
, p
, len
);
819 xprt
->tcp_offset
+= used
;
822 xprt
->tcp_flags
&= ~XPRT_COPY_XID
;
823 xprt
->tcp_flags
|= XPRT_COPY_DATA
;
824 xprt
->tcp_copied
= 4;
825 dprintk("RPC: reading reply for XID %08x\n", xprt
->tcp_xid
);
826 tcp_check_recm(xprt
);
830 * TCP read and complete request
833 tcp_read_request(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
835 struct rpc_rqst
*req
;
836 struct xdr_buf
*rcvbuf
;
839 /* Find and lock the request corresponding to this xid */
840 spin_lock(&xprt
->sock_lock
);
841 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
843 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
844 dprintk("RPC: XID %08x request not found!\n",
846 spin_unlock(&xprt
->sock_lock
);
850 rcvbuf
= &req
->rq_rcv_buf
;
852 if (len
> xprt
->tcp_reclen
- xprt
->tcp_offset
) {
853 skb_reader_t my_desc
;
855 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
856 memcpy(&my_desc
, desc
, sizeof(my_desc
));
858 xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
859 &my_desc
, tcp_copy_data
);
863 xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
864 desc
, tcp_copy_data
);
865 xprt
->tcp_copied
+= len
;
866 xprt
->tcp_offset
+= len
;
868 if (xprt
->tcp_copied
== req
->rq_rlen
)
869 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
870 else if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
871 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
)
872 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
875 if (!(xprt
->tcp_flags
& XPRT_COPY_DATA
)) {
876 dprintk("RPC: %4d received reply complete\n",
877 req
->rq_task
->tk_pid
);
878 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
880 spin_unlock(&xprt
->sock_lock
);
881 tcp_check_recm(xprt
);
885 * TCP discard extra bytes from a short read
888 tcp_read_discard(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
892 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
893 if (len
> desc
->count
)
897 xprt
->tcp_offset
+= len
;
898 tcp_check_recm(xprt
);
902 * TCP record receive routine
903 * We first have to grab the record marker, then the XID, then the data.
906 tcp_data_recv(read_descriptor_t
*rd_desc
, struct sk_buff
*skb
,
907 unsigned int offset
, size_t len
)
909 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)rd_desc
->buf
;
910 skb_reader_t desc
= {
917 dprintk("RPC: tcp_data_recv\n");
919 /* Read in a new fragment marker if necessary */
920 /* Can we ever really expect to get completely empty fragments? */
921 if (xprt
->tcp_flags
& XPRT_COPY_RECM
) {
922 tcp_read_fraghdr(xprt
, &desc
);
925 /* Read in the xid if necessary */
926 if (xprt
->tcp_flags
& XPRT_COPY_XID
) {
927 tcp_read_xid(xprt
, &desc
);
930 /* Read in the request data */
931 if (xprt
->tcp_flags
& XPRT_COPY_DATA
) {
932 tcp_read_request(xprt
, &desc
);
935 /* Skip over any trailing bytes on short reads */
936 tcp_read_discard(xprt
, &desc
);
937 } while (desc
.count
&& xprt_connected(xprt
));
938 dprintk("RPC: tcp_data_recv done\n");
939 return len
- desc
.count
;
942 static void tcp_data_ready(struct sock
*sk
, int bytes
)
944 struct rpc_xprt
*xprt
;
945 read_descriptor_t rd_desc
;
947 dprintk("RPC: tcp_data_ready...\n");
948 if (!(xprt
= xprt_from_sock(sk
))) {
949 printk("RPC: tcp_data_ready socket info not found!\n");
955 /* We use rd_desc to pass struct xprt to tcp_data_recv */
956 rd_desc
.buf
= (char *)xprt
;
957 rd_desc
.count
= 65536;
958 tcp_read_sock(sk
, &rd_desc
, tcp_data_recv
);
962 tcp_state_change(struct sock
*sk
)
964 struct rpc_xprt
*xprt
;
966 if (!(xprt
= xprt_from_sock(sk
)))
968 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
969 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
970 sk
->state
, xprt_connected(xprt
),
971 sk
->dead
, sk
->zapped
);
974 case TCP_ESTABLISHED
:
975 if (xprt_test_and_set_connected(xprt
))
978 /* Reset TCP record info */
979 xprt
->tcp_offset
= 0;
980 xprt
->tcp_reclen
= 0;
981 xprt
->tcp_copied
= 0;
982 xprt
->tcp_flags
= XPRT_COPY_RECM
| XPRT_COPY_XID
;
984 spin_lock(&xprt
->sock_lock
);
985 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->pending
)
986 rpc_wake_up_task(xprt
->snd_task
);
987 spin_unlock(&xprt
->sock_lock
);
993 xprt_disconnect(xprt
);
997 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
998 wake_up_interruptible_all(sk
->sleep
);
1002 * Called when more output buffer space is available for this socket.
1003 * We try not to wake our writers until they can make "significant"
1004 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1005 * with a bunch of small requests.
1008 xprt_write_space(struct sock
*sk
)
1010 struct rpc_xprt
*xprt
;
1011 struct socket
*sock
;
1013 if (!(xprt
= xprt_from_sock(sk
)) || !(sock
= sk
->socket
))
1018 /* Wait until we have enough socket memory */
1020 /* from net/ipv4/tcp.c:tcp_write_space */
1021 if (tcp_wspace(sk
) < tcp_min_write_space(sk
))
1024 /* from net/core/sock.c:sock_def_write_space */
1025 if (!sock_writeable(sk
))
1029 if (!test_and_clear_bit(SOCK_NOSPACE
, &sock
->flags
))
1032 spin_lock_bh(&xprt
->sock_lock
);
1033 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->pending
)
1034 rpc_wake_up_task(xprt
->snd_task
);
1035 spin_unlock_bh(&xprt
->sock_lock
);
1036 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
1037 wake_up_interruptible(sk
->sleep
);
1041 * Exponential backoff for UDP retries
1044 xprt_expbackoff(struct rpc_task
*task
, struct rpc_rqst
*req
)
1049 backoff
= min(rpc_ntimeo(&task
->tk_client
->cl_rtt
), XPRT_MAX_BACKOFF
);
1050 if (req
->rq_ntimeo
< (1 << backoff
))
1056 * RPC receive timeout handler.
1059 xprt_timer(struct rpc_task
*task
)
1061 struct rpc_rqst
*req
= task
->tk_rqstp
;
1062 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1064 spin_lock(&xprt
->sock_lock
);
1065 if (req
->rq_received
)
1068 if (!xprt
->nocong
) {
1069 if (xprt_expbackoff(task
, req
)) {
1070 rpc_add_timer(task
, xprt_timer
);
1073 rpc_inc_timeo(&task
->tk_client
->cl_rtt
);
1074 xprt_adjust_cwnd(req
->rq_xprt
, -ETIMEDOUT
);
1075 __xprt_put_cong(xprt
, req
);
1079 dprintk("RPC: %4d xprt_timer (%s request)\n",
1080 task
->tk_pid
, req
? "pending" : "backlogged");
1082 task
->tk_status
= -ETIMEDOUT
;
1084 task
->tk_timeout
= 0;
1085 rpc_wake_up_task(task
);
1087 spin_unlock(&xprt
->sock_lock
);
1091 * Place the actual RPC call.
1092 * We have to copy the iovec because sendmsg fiddles with its contents.
1095 xprt_transmit(struct rpc_task
*task
)
1097 struct rpc_rqst
*req
= task
->tk_rqstp
;
1098 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1100 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
1101 *(u32
*)(req
->rq_svec
[0].iov_base
));
1104 task
->tk_status
= -EIO
;
1106 if (!xprt_connected(xprt
))
1107 task
->tk_status
= -ENOTCONN
;
1109 if (task
->tk_status
< 0)
1112 if (task
->tk_rpcwait
)
1113 rpc_remove_wait_queue(task
);
1115 /* set up everything as needed. */
1116 /* Write the record marker */
1118 u32
*marker
= req
->rq_svec
[0].iov_base
;
1120 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1123 spin_lock_bh(&xprt
->sock_lock
);
1124 if (!__xprt_lock_write(xprt
, task
)) {
1125 spin_unlock_bh(&xprt
->sock_lock
);
1128 if (list_empty(&req
->rq_list
)) {
1129 list_add_tail(&req
->rq_list
, &xprt
->recv
);
1130 req
->rq_received
= 0;
1132 spin_unlock_bh(&xprt
->sock_lock
);
1134 do_xprt_transmit(task
);
1138 do_xprt_transmit(struct rpc_task
*task
)
1140 struct rpc_clnt
*clnt
= task
->tk_client
;
1141 struct rpc_rqst
*req
= task
->tk_rqstp
;
1142 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1143 int status
, retry
= 0;
1146 /* Continue transmitting the packet/record. We must be careful
1147 * to cope with writespace callbacks arriving _after_ we have
1148 * called xprt_sendmsg().
1151 req
->rq_xtime
= jiffies
;
1152 status
= xprt_sendmsg(xprt
, req
);
1158 req
->rq_bytes_sent
+= status
;
1160 if (req
->rq_bytes_sent
>= req
->rq_slen
)
1163 if (status
>= req
->rq_slen
)
1169 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1170 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1178 /* Note: at this point, task->tk_sleeping has not yet been set,
1179 * hence there is no danger of the waking up task being put on
1180 * schedq, and being picked up by a parallel run of rpciod().
1182 if (req
->rq_received
)
1185 task
->tk_status
= status
;
1189 if (test_bit(SOCK_ASYNC_NOSPACE
, &xprt
->sock
->flags
)) {
1190 /* Protect against races with xprt_write_space */
1191 spin_lock_bh(&xprt
->sock_lock
);
1192 if (test_bit(SOCK_NOSPACE
, &xprt
->sock
->flags
)) {
1193 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1194 rpc_sleep_on(&xprt
->pending
, task
, NULL
, NULL
);
1196 spin_unlock_bh(&xprt
->sock_lock
);
1199 /* Keep holding the socket if it is blocked */
1200 rpc_delay(task
, HZ
>>4);
1204 if (!xprt
->stream
) {
1205 task
->tk_timeout
= RPC_REESTABLISH_TIMEOUT
;
1206 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1212 xprt_disconnect(xprt
);
1213 req
->rq_bytes_sent
= 0;
1216 spin_lock_bh(&xprt
->sock_lock
);
1217 __xprt_release_write(xprt
, task
);
1218 __xprt_put_cong(xprt
, req
);
1219 spin_unlock_bh(&xprt
->sock_lock
);
1222 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1223 /* Set the task's receive timeout value */
1224 if (!xprt
->nocong
) {
1225 task
->tk_timeout
= rpc_calc_rto(&clnt
->cl_rtt
,
1226 task
->tk_msg
.rpc_proc
->p_timer
);
1228 if (task
->tk_timeout
> req
->rq_timeout
.to_maxval
)
1229 task
->tk_timeout
= req
->rq_timeout
.to_maxval
;
1231 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1232 spin_lock_bh(&xprt
->sock_lock
);
1233 if (!req
->rq_received
)
1234 rpc_sleep_on(&xprt
->pending
, task
, NULL
, xprt_timer
);
1235 __xprt_release_write(xprt
, task
);
1236 spin_unlock_bh(&xprt
->sock_lock
);
1240 * Reserve an RPC call slot.
1243 xprt_reserve(struct rpc_task
*task
)
1245 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1247 task
->tk_status
= -EIO
;
1248 if (!xprt
->shutdown
) {
1249 spin_lock(&xprt
->xprt_lock
);
1250 do_xprt_reserve(task
);
1251 spin_unlock(&xprt
->xprt_lock
);
1256 do_xprt_reserve(struct rpc_task
*task
)
1258 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1260 task
->tk_status
= 0;
1264 struct rpc_rqst
*req
= xprt
->free
;
1265 xprt
->free
= req
->rq_next
;
1266 req
->rq_next
= NULL
;
1267 task
->tk_rqstp
= req
;
1268 xprt_request_init(task
, xprt
);
1271 dprintk("RPC: waiting for request slot\n");
1272 task
->tk_status
= -EAGAIN
;
1273 task
->tk_timeout
= 0;
1274 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1278 * Initialize RPC request
1281 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1283 struct rpc_rqst
*req
= task
->tk_rqstp
;
1287 xid
= get_seconds() << 12;
1289 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1290 req
->rq_timeout
= xprt
->timeout
;
1291 req
->rq_task
= task
;
1292 req
->rq_xprt
= xprt
;
1293 req
->rq_xid
= xid
++;
1296 INIT_LIST_HEAD(&req
->rq_list
);
1300 * Release an RPC call slot
1303 xprt_release(struct rpc_task
*task
)
1305 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1306 struct rpc_rqst
*req
;
1308 if (!(req
= task
->tk_rqstp
))
1310 spin_lock_bh(&xprt
->sock_lock
);
1311 __xprt_release_write(xprt
, task
);
1312 __xprt_put_cong(xprt
, req
);
1313 if (!list_empty(&req
->rq_list
))
1314 list_del(&req
->rq_list
);
1315 spin_unlock_bh(&xprt
->sock_lock
);
1316 task
->tk_rqstp
= NULL
;
1317 memset(req
, 0, sizeof(*req
)); /* mark unused */
1319 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1321 spin_lock(&xprt
->xprt_lock
);
1322 req
->rq_next
= xprt
->free
;
1325 xprt_clear_backlog(xprt
);
1326 spin_unlock(&xprt
->xprt_lock
);
1330 * Set default timeout parameters
1333 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1335 if (proto
== IPPROTO_UDP
)
1336 xprt_set_timeout(to
, 5, 5 * HZ
);
1338 xprt_set_timeout(to
, 5, 60 * HZ
);
1342 * Set constant timeout
1345 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1349 to
->to_increment
= incr
;
1350 to
->to_maxval
= incr
* retr
;
1351 to
->to_retries
= retr
;
1352 to
->to_exponential
= 0;
1356 * Initialize an RPC client
1358 static struct rpc_xprt
*
1359 xprt_setup(int proto
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1361 struct rpc_xprt
*xprt
;
1362 struct rpc_rqst
*req
;
1365 dprintk("RPC: setting up %s transport...\n",
1366 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1368 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1370 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1374 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1376 xprt
->cwnd
= RPC_MAXCWND
;
1379 xprt
->cwnd
= RPC_INITCWND
;
1380 spin_lock_init(&xprt
->sock_lock
);
1381 spin_lock_init(&xprt
->xprt_lock
);
1382 init_waitqueue_head(&xprt
->cong_wait
);
1384 INIT_LIST_HEAD(&xprt
->recv
);
1386 /* Set timeout parameters */
1388 xprt
->timeout
= *to
;
1389 xprt
->timeout
.to_current
= to
->to_initval
;
1391 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1393 INIT_RPC_WAITQ(&xprt
->pending
, "xprt_pending");
1394 INIT_RPC_WAITQ(&xprt
->sending
, "xprt_sending");
1395 INIT_RPC_WAITQ(&xprt
->resend
, "xprt_resend");
1396 INIT_RPC_WAITQ(&xprt
->backlog
, "xprt_backlog");
1398 /* initialize free list */
1399 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1400 req
->rq_next
= req
+ 1;
1401 req
->rq_next
= NULL
;
1402 xprt
->free
= xprt
->slot
;
1404 dprintk("RPC: created transport %p\n", xprt
);
1410 * Bind to a reserved port
1413 xprt_bindresvport(struct socket
*sock
)
1415 struct sockaddr_in myaddr
;
1418 memset(&myaddr
, 0, sizeof(myaddr
));
1419 myaddr
.sin_family
= AF_INET
;
1422 myaddr
.sin_port
= htons(port
);
1423 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1425 } while (err
== -EADDRINUSE
&& --port
> 0);
1428 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1434 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1436 struct sock
*sk
= sock
->sk
;
1441 sk
->user_data
= xprt
;
1442 xprt
->old_data_ready
= sk
->data_ready
;
1443 xprt
->old_state_change
= sk
->state_change
;
1444 xprt
->old_write_space
= sk
->write_space
;
1445 if (xprt
->prot
== IPPROTO_UDP
) {
1446 sk
->data_ready
= udp_data_ready
;
1447 sk
->no_check
= UDP_CSUM_NORCV
;
1448 xprt_set_connected(xprt
);
1450 struct tcp_opt
*tp
= tcp_sk(sk
);
1451 tp
->nonagle
= 1; /* disable Nagle's algorithm */
1452 sk
->data_ready
= tcp_data_ready
;
1453 sk
->state_change
= tcp_state_change
;
1454 xprt_clear_connected(xprt
);
1456 sk
->write_space
= xprt_write_space
;
1458 /* Reset to new socket */
1466 * Set socket buffer length
1469 xprt_sock_setbufsize(struct rpc_xprt
*xprt
)
1471 struct sock
*sk
= xprt
->inet
;
1475 if (xprt
->rcvsize
) {
1476 sk
->userlocks
|= SOCK_RCVBUF_LOCK
;
1477 sk
->rcvbuf
= xprt
->rcvsize
* RPC_MAXCONG
* 2;
1479 if (xprt
->sndsize
) {
1480 sk
->userlocks
|= SOCK_SNDBUF_LOCK
;
1481 sk
->sndbuf
= xprt
->sndsize
* RPC_MAXCONG
* 2;
1482 sk
->write_space(sk
);
1487 * Datastream sockets are created here, but xprt_connect will create
1488 * and connect stream sockets.
1490 static struct socket
*
1491 xprt_create_socket(int proto
, struct rpc_timeout
*to
, int resvport
)
1493 struct socket
*sock
;
1496 dprintk("RPC: xprt_create_socket(%s %d)\n",
1497 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1499 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1501 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1502 printk("RPC: can't create socket (%d).\n", -err
);
1506 /* If the caller has the capability, bind to a reserved port */
1507 if (resvport
&& xprt_bindresvport(sock
) < 0) {
1508 printk("RPC: can't bind to reserved port.\n");
1520 * Create an RPC client transport given the protocol and peer address.
1523 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1525 struct rpc_xprt
*xprt
;
1527 xprt
= xprt_setup(proto
, sap
, to
);
1531 xprt
->resvport
= capable(CAP_NET_BIND_SERVICE
) ? 1 : 0;
1532 if (!xprt
->stream
) {
1533 struct socket
*sock
;
1535 sock
= xprt_create_socket(proto
, to
, xprt
->resvport
);
1538 xprt_bind_socket(xprt
, sock
);
1541 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt
);
1544 dprintk("RPC: xprt_create_proto failed\n");
1551 * Prepare for transport shutdown.
1554 xprt_shutdown(struct rpc_xprt
*xprt
)
1557 rpc_wake_up(&xprt
->sending
);
1558 rpc_wake_up(&xprt
->resend
);
1559 rpc_wake_up(&xprt
->pending
);
1560 rpc_wake_up(&xprt
->backlog
);
1561 if (waitqueue_active(&xprt
->cong_wait
))
1562 wake_up(&xprt
->cong_wait
);
1566 * Clear the xprt backlog queue
1569 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1570 rpc_wake_up_next(&xprt
->backlog
);
1571 if (waitqueue_active(&xprt
->cong_wait
))
1572 wake_up(&xprt
->cong_wait
);
1577 * Destroy an RPC transport, killing off all requests.
1580 xprt_destroy(struct rpc_xprt
*xprt
)
1582 dprintk("RPC: destroying transport %p\n", xprt
);
1583 xprt_shutdown(xprt
);