2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
42 #define __KERNEL_SYSCALLS__
44 #include <linux/version.h>
45 #include <linux/config.h>
46 #include <linux/types.h>
47 #include <linux/malloc.h>
48 #include <linux/sched.h>
49 #include <linux/errno.h>
50 #include <linux/socket.h>
52 #include <linux/net.h>
54 #include <linux/udp.h>
55 #include <linux/unistd.h>
56 #include <linux/sunrpc/clnt.h>
57 #include <linux/file.h>
60 #include <net/checksum.h>
63 #include <asm/uaccess.h>
65 #define SOCK_HAS_USER_DATA
70 #ifndef SOCK_HAS_USER_DATA
71 static struct rpc_xprt
* sock_list
= NULL
;
75 # undef RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY RPCDBG_XPRT
80 # define MAX(a, b) ((a) > (b)? (a) : (b))
81 # define MIN(a, b) ((a) < (b)? (a) : (b))
87 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
88 static void xprt_transmit_status(struct rpc_task
*task
);
89 static void xprt_receive_status(struct rpc_task
*task
);
90 static void xprt_reserve_status(struct rpc_task
*task
);
91 static void xprt_reconn_timeout(struct rpc_task
*task
);
92 static void xprt_reconn_status(struct rpc_task
*task
);
93 static struct socket
*xprt_create_socket(int, struct sockaddr_in
*,
94 struct rpc_timeout
*);
98 * Print the buffer contents (first 128 bytes only--just enough for
102 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
104 u8
*buf
= (u8
*) packet
;
107 dprintk("RPC: %s\n", msg
);
108 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
112 dprintk("0x%04x ", j
);
114 dprintk("%02x%02x%02x%02x ",
115 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
121 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
128 * Look up RPC transport given an INET socket
130 static inline struct rpc_xprt
*
131 xprt_from_sock(struct sock
*sk
)
133 #ifndef SOCK_HAS_USER_DATA
134 struct rpc_xprt
*xprt
;
136 for (xprt
= sock_list
; xprt
&& sk
!= xprt
->inet
; xprt
= xprt
->link
)
140 return (struct rpc_xprt
*) sk
->user_data
;
145 * Adjust the iovec to move on 'n' bytes
148 extern inline void xprt_move_iov(struct msghdr
*msg
, struct iovec
*niv
, int amount
)
150 struct iovec
*iv
=msg
->msg_iov
;
153 * Eat any sent iovecs
156 while(iv
->iov_len
< amount
)
166 * And chew down the partial one
169 niv
[0].iov_len
= iv
->iov_len
-amount
;
170 niv
[0].iov_base
=((unsigned char *)iv
->iov_base
)+amount
;
174 * And copy any others
177 for(amount
=1;amount
<msg
->msg_iovlen
; amount
++)
184 * Write data to socket.
188 xprt_sendmsg(struct rpc_xprt
*xprt
)
190 struct socket
*sock
= xprt
->sock
;
194 struct iovec niv
[MAX_IOVEC
];
196 xprt_pktdump("packet data:",
197 xprt
->snd_buf
.io_vec
->iov_base
,
198 xprt
->snd_buf
.io_vec
->iov_len
);
200 msg
.msg_flags
= MSG_DONTWAIT
;
201 msg
.msg_iov
= xprt
->snd_buf
.io_vec
;
202 msg
.msg_iovlen
= xprt
->snd_buf
.io_nr
;
203 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
204 msg
.msg_namelen
= sizeof(xprt
->addr
);
205 msg
.msg_control
= NULL
;
206 msg
.msg_controllen
= 0;
208 /* Dont repeat bytes */
211 xprt_move_iov(&msg
, niv
, xprt
->snd_sent
);
213 oldfs
= get_fs(); set_fs(get_ds());
214 result
= sock_sendmsg(sock
, &msg
, xprt
->snd_buf
.io_len
);
217 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
218 xprt
->snd_buf
.io_len
, result
);
221 xprt
->snd_buf
.io_len
-= result
;
222 xprt
->snd_sent
+= result
;
228 /* When the server has died, an ICMP port unreachable message
229 * prompts ECONNREFUSED.
234 case -ENOTCONN
: case -EPIPE
:
235 /* connection broken */
238 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
245 * Read data from socket
248 xprt_recvmsg(struct rpc_xprt
*xprt
, struct iovec
*iov
, int nr
, int len
)
250 struct socket
*sock
= xprt
->sock
;
251 struct sockaddr_in sin
;
256 #if LINUX_VERSION_CODE >= 0x020100
257 msg
.msg_flags
= MSG_DONTWAIT
;
261 msg
.msg_namelen
= sizeof(sin
);
262 msg
.msg_control
= NULL
;
263 msg
.msg_controllen
= 0;
265 oldfs
= get_fs(); set_fs(get_ds());
266 result
= sock_recvmsg(sock
, &msg
, len
, MSG_DONTWAIT
);
269 int alen
= sizeof(sin
);
274 msg
.msg_namelen
= sizeof(sin
);
275 msg
.msg_control
= NULL
;
276 msg
.msg_controllen
= 0;
278 oldfs
= get_fs(); set_fs(get_ds());
279 result
= sock
->ops
->recvmsg(sock
, &msg
, len
, 1, 0, &alen
);
283 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
290 * Adjust RPC congestion window
291 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
294 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
296 unsigned long cwnd
= xprt
->cwnd
;
301 if (xprt
->cong
< cwnd
|| time_before(jiffies
, xprt
->congtime
))
303 /* The (cwnd >> 1) term makes sure
304 * the result gets rounded properly. */
305 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
306 if (cwnd
> RPC_MAXCWND
)
309 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
310 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 2) / RPC_CWNDSCALE
;
311 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
312 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
313 (xprt
->congtime
-jiffies
)*1000/HZ
);
314 } else if (result
== -ETIMEDOUT
) {
315 if ((cwnd
>>= 1) < RPC_CWNDSCALE
)
316 cwnd
= RPC_CWNDSCALE
;
317 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 3) / RPC_CWNDSCALE
;
318 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
319 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
320 (xprt
->congtime
-jiffies
)*1000/HZ
);
321 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
328 * Adjust timeout values etc for next retransmit
331 xprt_adjust_timeout(struct rpc_timeout
*to
)
333 if (to
->to_exponential
)
334 to
->to_current
<<= 1;
336 to
->to_current
+= to
->to_increment
;
337 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
) {
338 to
->to_current
= to
->to_maxval
;
341 if (!to
->to_current
) {
342 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
343 to
->to_current
= 5 * HZ
;
345 pprintk("RPC: %lu %s\n", jiffies
,
346 to
->to_retries
? "retrans" : "timeout");
347 return (to
->to_retries
)--;
351 * Close down a transport socket
354 xprt_close(struct rpc_xprt
*xprt
)
356 struct sock
*sk
= xprt
->inet
;
358 #ifdef SOCK_HAS_USER_DATA
359 sk
->user_data
= NULL
;
361 sk
->data_ready
= xprt
->old_data_ready
;
363 sk
->state_change
= xprt
->old_state_change
;
364 sk
->write_space
= xprt
->old_write_space
;
366 sock_release(xprt
->sock
);
368 * TCP doesnt require the rpciod now - other things may
369 * but rpciod handles that not us.
371 if(xprt
->stream
&& !xprt
->connecting
)
376 * Mark a transport as disconnected
379 xprt_disconnect(struct rpc_xprt
*xprt
)
381 dprintk("RPC: disconnected transport %p\n", xprt
);
382 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
383 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
388 * Reconnect a broken TCP connection.
391 xprt_reconnect(struct rpc_task
*task
)
393 struct rpc_xprt
*xprt
= task
->tk_xprt
;
398 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
399 task
->tk_pid
, xprt
, xprt
->connected
);
402 if (xprt
->connecting
) {
403 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
404 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
407 xprt
->connecting
= 1;
409 /* Create an unconnected socket */
410 if (!(sock
= xprt_create_socket(xprt
->prot
, NULL
, &xprt
->timeout
)))
413 #if LINUX_VERSION_CODE >= 0x020100
416 inet
= (struct sock
*) sock
->data
;
418 inet
->data_ready
= xprt
->inet
->data_ready
;
419 inet
->state_change
= xprt
->inet
->state_change
;
420 inet
->write_space
= xprt
->inet
->write_space
;
421 #ifdef SOCK_HAS_USER_DATA
422 inet
->user_data
= xprt
;
425 dprintk("RPC: %4d closing old socket\n", task
->tk_pid
);
426 xprt_disconnect(xprt
);
429 /* Reset to new socket and default congestion */
432 xprt
->cwnd
= RPC_INITCWND
;
434 /* Now connect it asynchronously. */
435 dprintk("RPC: %4d connecting new socket\n", task
->tk_pid
);
436 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
437 sizeof(xprt
->addr
), O_NONBLOCK
);
439 if (status
!= -EINPROGRESS
&& status
!= -EALREADY
) {
440 printk("RPC: TCP connect error %d!\n", -status
);
444 dprintk("RPC: %4d connect status %d connected %d\n",
445 task
->tk_pid
, status
, xprt
->connected
);
446 task
->tk_timeout
= 60 * HZ
;
449 if (!xprt
->connected
) {
450 rpc_sleep_on(&xprt
->reconn
, task
,
451 xprt_reconn_status
, xprt_reconn_timeout
);
458 xprt
->connecting
= 0;
459 rpc_wake_up(&xprt
->reconn
);
463 task
->tk_timeout
= 30 * HZ
;
464 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
465 xprt
->connecting
= 0;
472 xprt_reconn_status(struct rpc_task
*task
)
474 struct rpc_xprt
*xprt
= task
->tk_xprt
;
476 dprintk("RPC: %4d xprt_reconn_status %d\n",
477 task
->tk_pid
, task
->tk_status
);
478 if (!xprt
->connected
&& task
->tk_status
!= -ETIMEDOUT
) {
479 task
->tk_timeout
= 30 * HZ
;
480 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, xprt_reconn_timeout
);
485 * Reconnect timeout. We just mark the transport as not being in the
486 * process of reconnecting, and leave the rest to the upper layers.
489 xprt_reconn_timeout(struct rpc_task
*task
)
491 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
492 task
->tk_pid
, task
->tk_status
);
493 task
->tk_status
= -ENOTCONN
;
494 task
->tk_xprt
->connecting
= 0;
495 task
->tk_timeout
= 0;
496 rpc_wake_up_task(task
);
500 * Look up the RPC request corresponding to a reply.
502 static inline struct rpc_rqst
*
503 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
505 struct rpc_task
*head
, *task
;
506 struct rpc_rqst
*req
;
509 if ((head
= xprt
->pending
.task
) != NULL
) {
512 if ((req
= task
->tk_rqstp
) && req
->rq_xid
== xid
)
514 task
= task
->tk_next
;
516 printk("xprt_lookup_rqst: loop in Q!\n");
519 } while (task
!= head
);
521 dprintk("RPC: unknown XID %08x in reply.\n", xid
);
526 * Complete reply received.
527 * The TCP code relies on us to remove the request from xprt->pending.
530 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
532 struct rpc_task
*task
= req
->rq_task
;
534 req
->rq_rlen
= copied
;
537 /* Adjust congestion window */
538 xprt_adjust_cwnd(xprt
, copied
);
541 /* Profile only reads for now */
543 static unsigned long nextstat
= 0;
544 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
547 pkt_len
+= req
->rq_slen
+ copied
;
548 pkt_rtt
+= jiffies
- req
->rq_xtime
;
549 if (time_before(nextstat
, jiffies
)) {
550 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
551 printk("RPC: %ld %ld %ld %ld stat\n",
552 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
553 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
554 nextstat
= jiffies
+ 5 * HZ
;
559 /* ... and wake up the process. */
560 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
561 task
->tk_status
= copied
;
563 rpc_wake_up_task(task
);
567 /* We have set things up such that we perform the checksum of the UDP
568 * packet in parallel with the copies into the RPC client iovec. -DaveM
570 static int csum_partial_copy_to_page_cache(struct iovec
*iov
,
574 __u8
*pkt_data
= skb
->data
+ sizeof(struct udphdr
);
575 __u8
*cur_ptr
= iov
->iov_base
;
576 __kernel_size_t cur_len
= iov
->iov_len
;
577 unsigned int csum
= skb
->csum
;
578 int need_csum
= (skb
->ip_summed
!= CHECKSUM_UNNECESSARY
);
579 int slack
= skb
->len
- copied
- sizeof(struct udphdr
);
582 csum
= csum_partial(skb
->h
.raw
, sizeof(struct udphdr
), csum
);
585 int to_move
= cur_len
;
586 if (to_move
> copied
)
589 csum
= csum_partial_copy_nocheck(pkt_data
, cur_ptr
,
592 memcpy(cur_ptr
, pkt_data
, to_move
);
600 cur_len
= iov
->iov_len
;
601 cur_ptr
= iov
->iov_base
;
606 csum
= csum_partial(pkt_data
, slack
, csum
);
607 if ((unsigned short)csum_fold(csum
))
613 /* Input handler for RPC replies. Called from a bottom half and hence
617 udp_data_ready(struct sock
*sk
, int len
)
619 struct rpc_xprt
*xprt
;
620 struct rpc_rqst
*rovr
;
622 int err
, repsize
, copied
;
624 dprintk("RPC: udp_data_ready...\n");
625 if (!(xprt
= xprt_from_sock(sk
)))
627 dprintk("RPC: udp_data_ready client %p\n", xprt
);
629 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
632 repsize
= skb
->len
- sizeof(struct udphdr
);
634 printk("RPC: impossible RPC reply size %d!\n", repsize
);
638 /* Look up the request corresponding to the given XID */
639 if (!(rovr
= xprt_lookup_rqst(xprt
,
640 *(u32
*) (skb
->h
.raw
+ sizeof(struct udphdr
)))))
643 dprintk("RPC: %4d received reply\n", rovr
->rq_task
->tk_pid
);
644 xprt_pktdump("packet data:",
645 (u32
*) (skb
->h
.raw
+ sizeof(struct udphdr
)), repsize
);
647 if ((copied
= rovr
->rq_rlen
) > repsize
)
650 /* Suck it into the iovec, verify checksum if not done by hw. */
651 if (csum_partial_copy_to_page_cache(rovr
->rq_rvec
, skb
, copied
))
654 /* Something worked... */
655 dst_confirm(skb
->dst
);
657 xprt_complete_rqst(xprt
, rovr
, copied
);
660 skb_free_datagram(sk
, skb
);
665 * TCP record receive routine
666 * This is not the most efficient code since we call recvfrom twice--
667 * first receiving the record marker and XID, then the data.
669 * The optimal solution would be a RPC support in the TCP layer, which
670 * would gather all data up to the next record marker and then pass us
671 * the list of all TCP segments ready to be copied.
674 tcp_input_record(struct rpc_xprt
*xprt
)
676 struct rpc_rqst
*req
;
680 int result
, maxcpy
, reclen
, avail
, want
;
682 dprintk("RPC: tcp_input_record\n");
683 offset
= xprt
->tcp_offset
;
685 if (offset
< 4 || (!xprt
->tcp_more
&& offset
< 8)) {
686 want
= (xprt
->tcp_more
? 4 : 8) - offset
;
687 dprintk("RPC: reading header (%d bytes)\n", want
);
688 riov
.iov_base
= xprt
->tcp_recm
.data
+ offset
;
690 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
693 dprintk("RPC: empty TCP record.\n");
704 /* Get the record length and mask out the more_fragments bit */
705 reclen
= ntohl(xprt
->tcp_reclen
);
706 dprintk("RPC: reclen %08x\n", reclen
);
707 xprt
->tcp_more
= (reclen
& 0x80000000)? 0 : 1;
708 reclen
&= 0x7fffffff;
709 xprt
->tcp_total
+= reclen
;
710 xprt
->tcp_reclen
= reclen
;
712 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
713 xprt
->tcp_xid
, xprt
->tcp_reclen
, xprt
->tcp_more
);
714 if (!xprt
->tcp_copied
715 && (req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
))) {
716 iov
= xprt
->tcp_iovec
;
717 memcpy(iov
, req
->rq_rvec
, req
->rq_rnr
* sizeof(iov
[0]));
719 *(u32
*)iov
->iov_base
= req
->rq_xid
;
723 xprt
->tcp_copied
= 4;
724 xprt
->tcp_rqstp
= req
;
727 reclen
= xprt
->tcp_reclen
;
730 avail
= reclen
- (offset
- 4);
731 if ((req
= xprt
->tcp_rqstp
) && req
->rq_xid
== xprt
->tcp_xid
732 && req
->rq_task
->tk_rpcwait
== &xprt
->pending
) {
733 want
= MIN(req
->rq_rlen
- xprt
->tcp_copied
, avail
);
735 dprintk("RPC: %4d TCP receiving %d bytes\n",
736 req
->rq_task
->tk_pid
, want
);
737 result
= xprt_recvmsg(xprt
, xprt
->tcp_iovec
, req
->rq_rnr
, want
);
742 xprt
->tcp_copied
+= result
;
750 maxcpy
= MIN(req
->rq_rlen
, xprt
->tcp_total
);
751 if (xprt
->tcp_copied
== maxcpy
&& !xprt
->tcp_more
) {
752 dprintk("RPC: %4d received reply complete\n",
753 req
->rq_task
->tk_pid
);
754 xprt_complete_rqst(xprt
, req
, xprt
->tcp_total
);
755 xprt
->tcp_copied
= 0;
756 xprt
->tcp_rqstp
= NULL
;
758 /* Request must be re-encoded before retransmit */
762 /* Skip over any trailing bytes on short reads */
766 want
= MIN(avail
, sizeof(dummy
));
767 riov
.iov_base
= dummy
;
769 dprintk("RPC: TCP skipping %d bytes\n", want
);
770 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
787 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
788 offset
, xprt
->tcp_total
, xprt
->tcp_copied
);
789 xprt
->tcp_offset
= offset
;
794 * TCP task queue stuff
797 static struct rpc_xprt
*rpc_xprt_pending
= NULL
; /* Chain by rx_pending of rpc_xprt's */
800 * This is protected from tcp_data_ready and the stack as its run
801 * inside of the RPC I/O daemon
804 void rpciod_tcp_dispatcher(void)
806 struct rpc_xprt
*xprt
;
809 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
812 * Empty each pending socket
815 while((xprt
=rpc_xprt_pending
)!=NULL
)
819 rpc_xprt_pending
=xprt
->rx_pending
;
820 xprt
->rx_pending_flag
=0;
822 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt
);
826 if (safe_retry
++ > 50)
828 result
= tcp_input_record(xprt
);
837 xprt_disconnect(xprt
);
840 printk(KERN_WARNING
"RPC: unexpected error %d from tcp_input_record\n",
847 extern inline void tcp_rpciod_queue(void)
853 * data_ready callback for TCP. We can't just jump into the
854 * tcp recvmsg functions inside of the network receive bh or
855 * bad things occur. We queue it to pick up after networking
859 static void tcp_data_ready(struct sock
*sk
, int len
)
861 struct rpc_xprt
*xprt
;
863 dprintk("RPC: tcp_data_ready...\n");
864 if (!(xprt
= xprt_from_sock(sk
)))
866 printk("Not a socket with xprt %p\n", sk
);
869 dprintk("RPC: tcp_data_ready client %p\n", xprt
);
870 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
871 sk
->state
, xprt
->connected
,
872 sk
->dead
, sk
->zapped
);
874 * If we are not waiting for the RPC bh run then
877 if (!xprt
->rx_pending_flag
)
881 dprintk("RPC: xprt queue %p\n", rpc_xprt_pending
);
882 if(rpc_xprt_pending
==NULL
)
884 xprt
->rx_pending_flag
=1;
885 xprt
->rx_pending
=rpc_xprt_pending
;
886 rpc_xprt_pending
=xprt
;
894 dprintk("RPC: xprt queued already %p\n", xprt
);
899 tcp_state_change(struct sock
*sk
)
901 struct rpc_xprt
*xprt
;
903 if (!(xprt
= xprt_from_sock(sk
)))
905 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
906 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
907 sk
->state
, xprt
->connected
,
908 sk
->dead
, sk
->zapped
);
910 if (sk
->state
== TCP_ESTABLISHED
&& !xprt
->connected
) {
912 xprt
->connecting
= 0;
913 rpc_wake_up(&xprt
->reconn
);
914 } else if (sk
->zapped
) {
915 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
916 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
917 rpc_wake_up_status(&xprt
->reconn
, -ENOTCONN
);
922 tcp_write_space(struct sock
*sk
)
924 struct rpc_xprt
*xprt
;
926 if (!(xprt
= xprt_from_sock(sk
)))
928 if(xprt
->snd_sent
&& xprt
->snd_task
)
929 dprintk("RPC: write space\n");
930 if(xprt
->write_space
== 0)
932 xprt
->write_space
= 1;
933 if (xprt
->snd_task
&& !RPC_IS_RUNNING(xprt
->snd_task
))
936 dprintk("RPC: Write wakeup snd_sent =%d\n",
938 rpc_wake_up_task(xprt
->snd_task
);
944 * RPC receive timeout handler.
947 xprt_timer(struct rpc_task
*task
)
949 struct rpc_rqst
*req
= task
->tk_rqstp
;
952 xprt_adjust_cwnd(task
->tk_xprt
, -ETIMEDOUT
);
955 dprintk("RPC: %4d xprt_timer (%s request)\n",
956 task
->tk_pid
, req
? "pending" : "backlogged");
958 task
->tk_status
= -ETIMEDOUT
;
959 task
->tk_timeout
= 0;
960 rpc_wake_up_task(task
);
964 * (Partly) transmit the RPC packet
965 * Note that task->tk_status is either 0 or negative on return.
966 * Only when the reply is received will the status be set to a
970 xprt_transmit_some(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
972 struct rpc_rqst
*req
= task
->tk_rqstp
;
976 if ((result
= xprt_sendmsg(xprt
)) >= 0) {
977 if (!xprt
->snd_buf
.io_len
|| !xprt
->stream
) {
978 rpc_wake_up_next(&xprt
->sending
);
982 } else if (xprt
->stream
) {
983 if (result
== -ENOTCONN
|| result
== -EPIPE
) {
984 xprt_disconnect(xprt
);
988 return task
->tk_status
= result
;
992 * Place the actual RPC call.
993 * We have to copy the iovec because sendmsg fiddles with its contents.
996 xprt_transmit(struct rpc_task
*task
)
998 struct rpc_timeout
*timeo
;
999 struct rpc_rqst
*req
= task
->tk_rqstp
;
1000 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1003 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
1004 *(u32
*)(req
->rq_svec
[0].iov_base
));
1006 if (xprt
->shutdown
) {
1007 task
->tk_status
= -EIO
;
1011 /* If we're not already in the process of transmitting our call,
1012 * set up everything as needed. */
1013 if (xprt
->snd_task
!= task
) {
1014 /* Write the record marker */
1018 if (!xprt
->connected
) {
1019 task
->tk_status
= -ENOTCONN
;
1022 marker
= htonl(0x80000000|(req
->rq_slen
-4));
1023 *((u32
*) req
->rq_svec
[0].iov_base
) = marker
;
1026 /* Reset timeout parameters */
1027 timeo
= &req
->rq_timeout
;
1028 if (timeo
->to_retries
< 0) {
1029 dprintk("RPC: %4d xprt_transmit reset timeo\n",
1031 timeo
->to_retries
= xprt
->timeout
.to_retries
;
1032 timeo
->to_current
= timeo
->to_initval
;
1036 req
->rq_xtime
= jiffies
;
1040 if (xprt
->snd_task
) {
1041 dprintk("RPC: %4d TCP write queue full (task %d)\n",
1042 task
->tk_pid
, xprt
->snd_task
->tk_pid
);
1043 rpc_sleep_on(&xprt
->sending
, task
,
1044 xprt_transmit_status
, NULL
);
1047 xprt
->snd_buf
= req
->rq_snd_buf
;
1048 xprt
->snd_task
= task
;
1052 /* For fast networks/servers we have to put the request on
1053 * the pending list now:
1056 status
= rpc_add_wait_queue(&xprt
->pending
, task
);
1058 task
->tk_callback
= NULL
;
1063 printk(KERN_WARNING
"RPC: failed to add task to queue: error: %d!\n", status
);
1064 task
->tk_status
= status
;
1068 /* Continue transmitting the packet/record. We must be careful
1069 * to cope with writespace callbacks arriving _after_ we have
1070 * called xprt_sendmsg().
1073 xprt
->write_space
= 0;
1074 if (xprt_transmit_some(xprt
, task
) != -EAGAIN
) {
1075 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1076 xprt
->snd_task
= NULL
;
1080 /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1081 task
->tk_pid
, xprt
->snd_buf
.io_len
,
1083 task
->tk_status
= 0;
1085 if (!xprt
->write_space
) {
1086 /* Remove from pending */
1087 rpc_remove_wait_queue(task
);
1088 rpc_sleep_on(&xprt
->sending
, task
,
1089 xprt_transmit_status
, NULL
);
1098 * This callback is invoked when the sending task is forced to sleep
1099 * because the TCP write buffers are full
1102 xprt_transmit_status(struct rpc_task
*task
)
1104 struct rpc_xprt
*xprt
= task
->tk_client
->cl_xprt
;
1106 dprintk("RPC: %4d transmit_status %d\n", task
->tk_pid
, task
->tk_status
);
1107 if (xprt
->snd_task
== task
)
1109 if (task
->tk_status
< 0)
1111 xprt
->snd_task
= NULL
;
1112 xprt_disconnect(xprt
);
1115 xprt_transmit(task
);
1120 * Wait for the reply to our call.
1121 * When the callback is invoked, the congestion window should have
1122 * been updated already.
1125 xprt_receive(struct rpc_task
*task
)
1127 struct rpc_rqst
*req
= task
->tk_rqstp
;
1128 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1130 dprintk("RPC: %4d xprt_receive\n", task
->tk_pid
);
1131 if (xprt
->connected
== 0) {
1132 task
->tk_status
= -ENOTCONN
;
1137 * Wait until rq_gotit goes non-null, or timeout elapsed.
1139 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1142 if (!req
->rq_gotit
) {
1143 rpc_sleep_on(&xprt
->pending
, task
,
1144 xprt_receive_status
, xprt_timer
);
1148 dprintk("RPC: %4d xprt_receive returns %d\n",
1149 task
->tk_pid
, task
->tk_status
);
1153 xprt_receive_status(struct rpc_task
*task
)
1155 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1157 if (xprt
->stream
&& xprt
->tcp_rqstp
== task
->tk_rqstp
)
1158 xprt
->tcp_rqstp
= NULL
;
1162 * Reserve an RPC call slot.
1165 xprt_reserve(struct rpc_task
*task
)
1167 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1169 /* We already have an initialized request. */
1173 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1174 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1175 if ((!RPCXPRT_CONGESTED(xprt
) && xprt
->free
)) {
1176 xprt_reserve_status(task
);
1177 task
->tk_timeout
= 0;
1178 } else if (!task
->tk_timeout
) {
1179 task
->tk_status
= -ENOBUFS
;
1181 dprintk("RPC: xprt_reserve waiting on backlog\n");
1182 rpc_sleep_on(&xprt
->backlog
, task
, xprt_reserve_status
, NULL
);
1184 dprintk("RPC: %4d xprt_reserve returns %d\n",
1185 task
->tk_pid
, task
->tk_status
);
1186 return task
->tk_status
;
1190 * Reservation callback
1193 xprt_reserve_status(struct rpc_task
*task
)
1195 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1196 struct rpc_rqst
*req
;
1198 if (xprt
->shutdown
) {
1199 task
->tk_status
= -EIO
;
1200 } else if (task
->tk_status
< 0) {
1202 } else if (task
->tk_rqstp
) {
1203 /* We've already been given a request slot: NOP */
1204 } else if (!RPCXPRT_CONGESTED(xprt
)) {
1205 /* OK: There's room for us. Grab a free slot and bump
1206 * congestion value */
1212 xprt
->free
= req
->rq_next
;
1213 xprt
->cong
+= RPC_CWNDSCALE
;
1214 task
->tk_rqstp
= req
;
1215 req
->rq_next
= NULL
;
1216 xprt_request_init(task
, xprt
);
1218 task
->tk_status
= -EAGAIN
;
1221 if (xprt
->free
&& !RPCXPRT_CONGESTED(xprt
))
1222 rpc_wake_up_next(&xprt
->backlog
);
1228 "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1229 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1233 printk(KERN_ERR
"RPC: used rqst slot %p on free list!\n", req
);
1235 task
->tk_status
= -EIO
;
1241 * Initialize RPC request
1244 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1246 struct rpc_rqst
*req
= task
->tk_rqstp
;
1250 xid
= CURRENT_TIME
<< 12;
1252 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1253 task
->tk_status
= 0;
1255 req
->rq_timeout
= xprt
->timeout
;
1256 req
->rq_task
= task
;
1257 req
->rq_xprt
= xprt
;
1258 req
->rq_xid
= xid
++;
1264 * Release an RPC call slot
1267 xprt_release(struct rpc_task
*task
)
1269 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1270 struct rpc_rqst
*req
;
1272 if (!(req
= task
->tk_rqstp
))
1274 task
->tk_rqstp
= NULL
;
1275 memset(req
, 0, sizeof(*req
)); /* mark unused */
1277 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1279 /* remove slot from queue of pending */
1281 if (task
->tk_rpcwait
) {
1282 printk("RPC: task of released request still queued!\n");
1284 printk("RPC: (task is on %s)\n", rpc_qname(task
->tk_rpcwait
));
1286 rpc_del_timer(task
);
1287 rpc_remove_wait_queue(task
);
1291 /* Decrease congestion value. */
1292 xprt
->cong
-= RPC_CWNDSCALE
;
1295 /* If congestion threshold is not yet reached, pass on the request slot.
1296 * This looks kind of kludgy, but it guarantees backlogged requests
1297 * are served in order.
1298 * N.B. This doesn't look completely safe, as the task is still
1299 * on the backlog list after wake-up.
1301 if (!RPCXPRT_CONGESTED(xprt
)) {
1302 struct rpc_task
*next
= rpc_wake_up_next(&xprt
->backlog
);
1304 if (next
&& next
->tk_rqstp
== 0) {
1305 xprt
->cong
+= RPC_CWNDSCALE
;
1306 next
->tk_rqstp
= req
;
1307 xprt_request_init(next
, xprt
);
1313 req
->rq_next
= xprt
->free
;
1316 /* If not congested, wake up the next backlogged process */
1317 if (!RPCXPRT_CONGESTED(xprt
))
1318 rpc_wake_up_next(&xprt
->backlog
);
1322 * Set default timeout parameters
1325 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1327 if (proto
== IPPROTO_UDP
)
1328 xprt_set_timeout(to
, 5, 5 * HZ
);
1330 xprt_set_timeout(to
, 5, 15 * HZ
);
1334 * Set constant timeout
1337 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1341 to
->to_increment
= incr
;
1342 to
->to_maxval
= incr
* retr
;
1343 to
->to_resrvval
= incr
* retr
;
1344 to
->to_retries
= retr
;
1345 to
->to_exponential
= 0;
1349 * Initialize an RPC client
1351 static struct rpc_xprt
*
1352 xprt_setup(struct socket
*sock
, int proto
,
1353 struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1355 struct rpc_xprt
*xprt
;
1356 struct rpc_rqst
*req
;
1360 dprintk("RPC: setting up %s transport...\n",
1361 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1363 #if LINUX_VERSION_CODE >= 0x020100
1366 inet
= (struct sock
*) sock
->data
;
1369 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1371 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1378 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1379 xprt
->cwnd
= RPC_INITCWND
;
1380 #ifdef SOCK_HAS_USER_DATA
1381 inet
->user_data
= xprt
;
1383 xprt
->link
= sock_list
;
1386 xprt
->old_data_ready
= inet
->data_ready
;
1387 xprt
->old_state_change
= inet
->state_change
;
1388 xprt
->old_write_space
= inet
->write_space
;
1389 if (proto
== IPPROTO_UDP
) {
1390 inet
->data_ready
= udp_data_ready
;
1391 inet
->no_check
= UDP_CSUM_NORCV
;
1393 inet
->data_ready
= tcp_data_ready
;
1394 inet
->state_change
= tcp_state_change
;
1395 inet
->write_space
= tcp_write_space
;
1398 xprt
->connected
= 1;
1400 /* Set timeout parameters */
1402 xprt
->timeout
= *to
;
1403 xprt
->timeout
.to_current
= to
->to_initval
;
1404 xprt
->timeout
.to_resrvval
= to
->to_maxval
<< 1;
1406 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1409 xprt
->pending
= RPC_INIT_WAITQ("xprt_pending");
1410 xprt
->sending
= RPC_INIT_WAITQ("xprt_sending");
1411 xprt
->backlog
= RPC_INIT_WAITQ("xprt_backlog");
1412 xprt
->reconn
= RPC_INIT_WAITQ("xprt_reconn");
1414 /* initialize free list */
1415 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1416 req
->rq_next
= req
+ 1;
1417 req
->rq_next
= NULL
;
1418 xprt
->free
= xprt
->slot
;
1420 dprintk("RPC: created transport %p\n", xprt
);
1423 * TCP requires the rpc I/O daemon is present
1425 if(proto
==IPPROTO_TCP
)
1431 * Bind to a reserved port
1434 xprt_bindresvport(struct socket
*sock
)
1436 struct sockaddr_in myaddr
;
1439 memset(&myaddr
, 0, sizeof(myaddr
));
1440 myaddr
.sin_family
= AF_INET
;
1443 myaddr
.sin_port
= htons(port
);
1444 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1446 } while (err
== -EADDRINUSE
&& --port
> 0);
1449 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1455 * Create a client socket given the protocol and peer address.
1457 static struct socket
*
1458 xprt_create_socket(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1460 struct socket
*sock
;
1463 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1464 sap
? ntohl(sap
->sin_addr
.s_addr
) : 0,
1465 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1467 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1468 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1469 printk("RPC: can't create socket (%d).\n", -err
);
1473 /* If the caller has root privs, bind to a reserved port */
1474 if (!current
->fsuid
&& xprt_bindresvport(sock
) < 0)
1477 if (type
== SOCK_STREAM
&& sap
) {
1478 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) sap
,
1481 printk("RPC: TCP connect failed (%d).\n", -err
);
1494 * Create an RPC client transport given the protocol and peer address.
1497 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1499 struct socket
*sock
;
1500 struct rpc_xprt
*xprt
;
1502 dprintk("RPC: xprt_create_proto called\n");
1504 if (!(sock
= xprt_create_socket(proto
, sap
, to
)))
1507 if (!(xprt
= xprt_setup(sock
, proto
, sap
, to
)))
1514 * Prepare for transport shutdown.
1517 xprt_shutdown(struct rpc_xprt
*xprt
)
1520 rpc_wake_up(&xprt
->sending
);
1521 rpc_wake_up(&xprt
->pending
);
1522 rpc_wake_up(&xprt
->backlog
);
1523 rpc_wake_up(&xprt
->reconn
);
1527 * Destroy an RPC transport, killing off all requests.
1530 xprt_destroy(struct rpc_xprt
*xprt
)
1532 #ifndef SOCK_HAS_USER_DATA
1533 struct rpc_xprt
**q
;
1535 for (q
= &sock_list
; *q
&& *q
!= xprt
; q
= &((*q
)->link
))
1538 printk(KERN_WARNING
"xprt_destroy: unknown socket!\n");
1539 return -EIO
; /* why is there no EBUGGYSOFTWARE */
1544 dprintk("RPC: destroying transport %p\n", xprt
);