2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #define __KERNEL_SYSCALLS__
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/malloc.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
56 #include <linux/net.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
64 #include <net/checksum.h>
67 #include <asm/uaccess.h>
69 /* Following value should be > 32k + RPC overhead */
70 #define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)
72 extern spinlock_t rpc_queue_lock
;
78 /* Spinlock for critical sections in the code. */
79 spinlock_t xprt_sock_lock
= SPIN_LOCK_UNLOCKED
;
80 spinlock_t xprt_lock
= SPIN_LOCK_UNLOCKED
;
83 # undef RPC_DEBUG_DATA
84 # define RPCDBG_FACILITY RPCDBG_XPRT
88 # define MAX(a, b) ((a) > (b)? (a) : (b))
89 # define MIN(a, b) ((a) < (b)? (a) : (b))
95 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
96 static void do_xprt_transmit(struct rpc_task
*);
97 static void xprt_reserve_status(struct rpc_task
*task
);
98 static void xprt_disconnect(struct rpc_xprt
*);
99 static void xprt_reconn_status(struct rpc_task
*task
);
100 static struct socket
*xprt_create_socket(int, struct rpc_timeout
*);
101 static int xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
102 static void xprt_remove_pending(struct rpc_xprt
*);
104 #ifdef RPC_DEBUG_DATA
106 * Print the buffer contents (first 128 bytes only--just enough for
110 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
112 u8
*buf
= (u8
*) packet
;
115 dprintk("RPC: %s\n", msg
);
116 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
120 dprintk("0x%04x ", j
);
122 dprintk("%02x%02x%02x%02x ",
123 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
129 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
136 * Look up RPC transport given an INET socket
138 static inline struct rpc_xprt
*
139 xprt_from_sock(struct sock
*sk
)
141 return (struct rpc_xprt
*) sk
->user_data
;
145 * Adjust the iovec to move on 'n' bytes
149 xprt_move_iov(struct msghdr
*msg
, struct iovec
*niv
, unsigned amount
)
151 struct iovec
*iv
=msg
->msg_iov
;
155 * Eat any sent iovecs
157 while (iv
->iov_len
<= amount
) {
158 amount
-= iv
->iov_len
;
164 * And chew down the partial one
166 niv
[0].iov_len
= iv
->iov_len
-amount
;
167 niv
[0].iov_base
=((unsigned char *)iv
->iov_base
)+amount
;
171 * And copy any others
173 for(i
= 1; i
< msg
->msg_iovlen
; i
++)
180 * Write data to socket.
183 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
185 struct socket
*sock
= xprt
->sock
;
189 int slen
= req
->rq_slen
- req
->rq_bytes_sent
;
190 struct iovec niv
[MAX_IOVEC
];
198 xprt_pktdump("packet data:",
199 req
->rq_svec
->iov_base
,
200 req
->rq_svec
->iov_len
);
202 msg
.msg_flags
= MSG_DONTWAIT
|MSG_NOSIGNAL
;
203 msg
.msg_iov
= req
->rq_svec
;
204 msg
.msg_iovlen
= req
->rq_snr
;
205 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
206 msg
.msg_namelen
= sizeof(xprt
->addr
);
207 msg
.msg_control
= NULL
;
208 msg
.msg_controllen
= 0;
210 /* Dont repeat bytes */
211 if (req
->rq_bytes_sent
)
212 xprt_move_iov(&msg
, niv
, req
->rq_bytes_sent
);
214 oldfs
= get_fs(); set_fs(get_ds());
215 result
= sock_sendmsg(sock
, &msg
, slen
);
218 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen
, result
);
225 /* When the server has died, an ICMP port unreachable message
226 * prompts ECONNREFUSED.
230 if (test_bit(SOCK_NOSPACE
, &sock
->flags
))
235 /* connection broken */
240 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
246 * Read data from socket
249 xprt_recvmsg(struct rpc_xprt
*xprt
, struct iovec
*iov
, int nr
, unsigned len
, unsigned shift
)
251 struct socket
*sock
= xprt
->sock
;
254 struct iovec niv
[MAX_IOVEC
];
260 msg
.msg_flags
= MSG_DONTWAIT
|MSG_NOSIGNAL
;
265 msg
.msg_control
= NULL
;
266 msg
.msg_controllen
= 0;
268 /* Adjust the iovec if we've already filled it */
270 xprt_move_iov(&msg
, niv
, shift
);
272 oldfs
= get_fs(); set_fs(get_ds());
273 result
= sock_recvmsg(sock
, &msg
, len
, MSG_DONTWAIT
);
276 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
283 * Adjust RPC congestion window
284 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
287 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
293 spin_lock_bh(&xprt_sock_lock
);
296 if (xprt
->cong
< cwnd
|| time_before(jiffies
, xprt
->congtime
))
298 /* The (cwnd >> 1) term makes sure
299 * the result gets rounded properly. */
300 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
301 if (cwnd
> RPC_MAXCWND
)
304 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
305 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 2) / RPC_CWNDSCALE
;
306 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
307 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
308 (xprt
->congtime
-jiffies
)*1000/HZ
);
309 } else if (result
== -ETIMEDOUT
) {
310 if ((cwnd
>>= 1) < RPC_CWNDSCALE
)
311 cwnd
= RPC_CWNDSCALE
;
312 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 3) / RPC_CWNDSCALE
;
313 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
314 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
315 (xprt
->congtime
-jiffies
)*1000/HZ
);
316 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
321 spin_unlock_bh(&xprt_sock_lock
);
325 * Adjust timeout values etc for next retransmit
328 xprt_adjust_timeout(struct rpc_timeout
*to
)
330 if (to
->to_retries
> 0) {
331 if (to
->to_exponential
)
332 to
->to_current
<<= 1;
334 to
->to_current
+= to
->to_increment
;
335 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
)
336 to
->to_current
= to
->to_maxval
;
338 if (to
->to_exponential
)
339 to
->to_initval
<<= 1;
341 to
->to_initval
+= to
->to_increment
;
342 if (to
->to_maxval
&& to
->to_initval
>= to
->to_maxval
)
343 to
->to_initval
= to
->to_maxval
;
344 to
->to_current
= to
->to_initval
;
347 if (!to
->to_current
) {
348 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
349 to
->to_current
= 5 * HZ
;
351 pprintk("RPC: %lu %s\n", jiffies
,
352 to
->to_retries
? "retrans" : "timeout");
353 return to
->to_retries
-- > 0;
357 * Close down a transport socket
360 xprt_close(struct rpc_xprt
*xprt
)
362 struct socket
*sock
= xprt
->sock
;
363 struct sock
*sk
= xprt
->inet
;
371 sk
->user_data
= NULL
;
372 sk
->data_ready
= xprt
->old_data_ready
;
373 sk
->state_change
= xprt
->old_state_change
;
374 sk
->write_space
= xprt
->old_write_space
;
376 xprt_disconnect(xprt
);
381 * TCP doesnt require the rpciod now - other things may
382 * but rpciod handles that not us.
389 * Mark a transport as disconnected
392 xprt_disconnect(struct rpc_xprt
*xprt
)
394 dprintk("RPC: disconnected transport %p\n", xprt
);
395 xprt_clear_connected(xprt
);
396 xprt_remove_pending(xprt
);
397 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
401 * Reconnect a broken TCP connection.
404 xprt_reconnect(struct rpc_task
*task
)
406 struct rpc_xprt
*xprt
= task
->tk_xprt
;
407 struct socket
*sock
= xprt
->sock
;
408 struct sock
*inet
= xprt
->inet
;
411 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
412 task
->tk_pid
, xprt
, xprt_connected(xprt
));
419 if (!xprt
->addr
.sin_port
) {
420 task
->tk_status
= -EIO
;
424 spin_lock(&xprt_lock
);
425 if (xprt
->connecting
) {
426 task
->tk_timeout
= 0;
427 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
428 spin_unlock(&xprt_lock
);
431 xprt
->connecting
= 1;
432 spin_unlock(&xprt_lock
);
436 /* Create an unconnected socket */
437 if (!(sock
= xprt_create_socket(xprt
->prot
, &xprt
->timeout
)))
439 xprt_bind_socket(xprt
, sock
);
443 xprt_disconnect(xprt
);
445 /* Reset TCP record info */
446 xprt
->tcp_offset
= 0;
447 xprt
->tcp_copied
= 0;
450 /* Now connect it asynchronously. */
451 dprintk("RPC: %4d connecting new socket\n", task
->tk_pid
);
452 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
453 sizeof(xprt
->addr
), O_NONBLOCK
);
467 printk("RPC: TCP connect error %d!\n", -status
);
472 dprintk("RPC: %4d connect status %d connected %d\n",
473 task
->tk_pid
, status
, xprt_connected(xprt
));
475 spin_lock_bh(&xprt_sock_lock
);
476 if (!xprt_connected(xprt
)) {
477 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
478 rpc_sleep_on(&xprt
->reconn
, task
, xprt_reconn_status
, NULL
);
479 spin_unlock_bh(&xprt_sock_lock
);
482 spin_unlock_bh(&xprt_sock_lock
);
485 spin_lock(&xprt_lock
);
486 xprt
->connecting
= 0;
488 rpc_delay(task
, 5*HZ
);
489 task
->tk_status
= -ENOTCONN
;
491 rpc_wake_up(&xprt
->reconn
);
492 spin_unlock(&xprt_lock
);
496 * Reconnect timeout. We just mark the transport as not being in the
497 * process of reconnecting, and leave the rest to the upper layers.
500 xprt_reconn_status(struct rpc_task
*task
)
502 struct rpc_xprt
*xprt
= task
->tk_xprt
;
504 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
505 task
->tk_pid
, task
->tk_status
);
507 spin_lock(&xprt_lock
);
508 xprt
->connecting
= 0;
509 rpc_wake_up(&xprt
->reconn
);
510 spin_unlock(&xprt_lock
);
514 * Look up the RPC request corresponding to a reply, and then lock it.
516 static inline struct rpc_rqst
*
517 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
519 struct rpc_task
*head
, *task
;
520 struct rpc_rqst
*req
;
523 spin_lock_bh(&rpc_queue_lock
);
524 if ((head
= xprt
->pending
.task
) != NULL
) {
527 if ((req
= task
->tk_rqstp
) && req
->rq_xid
== xid
)
529 task
= task
->tk_next
;
531 printk("xprt_lookup_rqst: loop in Q!\n");
534 } while (task
!= head
);
536 dprintk("RPC: unknown XID %08x in reply.\n", xid
);
540 if (req
&& !__rpc_lock_task(req
->rq_task
))
542 spin_unlock_bh(&rpc_queue_lock
);
547 * Complete reply received.
548 * The TCP code relies on us to remove the request from xprt->pending.
551 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
553 struct rpc_task
*task
= req
->rq_task
;
555 /* Adjust congestion window */
556 xprt_adjust_cwnd(xprt
, copied
);
559 /* Profile only reads for now */
561 static unsigned long nextstat
= 0;
562 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
565 pkt_len
+= req
->rq_slen
+ copied
;
566 pkt_rtt
+= jiffies
- req
->rq_xtime
;
567 if (time_before(nextstat
, jiffies
)) {
568 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
569 printk("RPC: %ld %ld %ld %ld stat\n",
570 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
571 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
572 nextstat
= jiffies
+ 5 * HZ
;
577 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
578 task
->tk_status
= copied
;
579 req
->rq_received
= 1;
581 /* ... and wake up the process. */
582 rpc_wake_up_task(task
);
587 * We have set things up such that we perform the checksum of the UDP
588 * packet in parallel with the copies into the RPC client iovec. -DaveM
590 static int csum_partial_copy_to_page_cache(struct iovec
*iov
,
594 __u8
*pkt_data
= skb
->h
.raw
+ sizeof(struct udphdr
);
595 __u8
*cur_ptr
= iov
->iov_base
;
596 __kernel_size_t cur_len
= iov
->iov_len
;
597 unsigned int csum
= skb
->csum
;
598 int need_csum
= (skb
->ip_summed
!= CHECKSUM_UNNECESSARY
);
599 int slack
= skb
->len
- copied
- sizeof(struct udphdr
);
602 csum
= csum_partial(skb
->h
.raw
, sizeof(struct udphdr
), csum
);
605 int to_move
= cur_len
;
606 if (to_move
> copied
)
609 csum
= csum_partial_copy_nocheck(pkt_data
, cur_ptr
,
612 memcpy(cur_ptr
, pkt_data
, to_move
);
620 cur_len
= iov
->iov_len
;
621 cur_ptr
= iov
->iov_base
;
626 csum
= csum_partial(pkt_data
, slack
, csum
);
627 if ((unsigned short)csum_fold(csum
))
634 * Input handler for RPC replies. Called from a bottom half and hence
638 udp_data_ready(struct sock
*sk
, int len
)
640 struct rpc_task
*task
;
641 struct rpc_xprt
*xprt
;
642 struct rpc_rqst
*rovr
;
644 int err
, repsize
, copied
;
646 dprintk("RPC: udp_data_ready...\n");
647 if (!(xprt
= xprt_from_sock(sk
))) {
648 printk("RPC: udp_data_ready request not found!\n");
652 dprintk("RPC: udp_data_ready client %p\n", xprt
);
654 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
660 repsize
= skb
->len
- sizeof(struct udphdr
);
662 printk("RPC: impossible RPC reply size %d!\n", repsize
);
666 /* Look up and lock the request corresponding to the given XID */
667 rovr
= xprt_lookup_rqst(xprt
, *(u32
*) (skb
->h
.raw
+ sizeof(struct udphdr
)));
670 task
= rovr
->rq_task
;
672 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
673 xprt_pktdump("packet data:",
674 (u32
*) (skb
->h
.raw
+sizeof(struct udphdr
)), repsize
);
676 if ((copied
= rovr
->rq_rlen
) > repsize
)
679 /* Suck it into the iovec, verify checksum if not done by hw. */
680 if (csum_partial_copy_to_page_cache(rovr
->rq_rvec
, skb
, copied
))
683 /* Something worked... */
684 dst_confirm(skb
->dst
);
686 xprt_complete_rqst(xprt
, rovr
, copied
);
689 rpc_unlock_task(task
);
692 skb_free_datagram(sk
, skb
);
694 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
695 wake_up_interruptible(sk
->sleep
);
699 * TCP read fragment marker
702 tcp_read_fraghdr(struct rpc_xprt
*xprt
)
707 if (xprt
->tcp_offset
>= xprt
->tcp_reclen
+ sizeof(xprt
->tcp_recm
)) {
708 xprt
->tcp_offset
= 0;
709 xprt
->tcp_reclen
= 0;
711 if (xprt
->tcp_offset
>= sizeof(xprt
->tcp_recm
))
714 want
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
715 dprintk("RPC: reading header (%d bytes)\n", want
);
717 riov
.iov_base
= ((u8
*) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
719 result
= xprt_recvmsg(xprt
, &riov
, 1, want
, 0);
722 xprt
->tcp_offset
+= result
;
726 /* Is this another fragment in the last message */
728 xprt
->tcp_copied
= 0; /* No, so we're reading a new message */
730 /* Get the record length and mask out the last fragment bit */
731 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
732 xprt
->tcp_more
= (xprt
->tcp_reclen
& 0x80000000) ? 0 : 1;
733 xprt
->tcp_reclen
&= 0x7fffffff;
735 dprintk("RPC: New record reclen %d morefrags %d\n",
736 xprt
->tcp_reclen
, xprt
->tcp_more
);
738 return xprt
->tcp_reclen
+ sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
745 tcp_read_xid(struct rpc_xprt
*xprt
, int avail
)
750 if (xprt
->tcp_copied
>= sizeof(xprt
->tcp_xid
) || !avail
)
752 want
= MIN(sizeof(xprt
->tcp_xid
) - xprt
->tcp_copied
, avail
);
754 dprintk("RPC: reading xid (%d bytes)\n", want
);
755 riov
.iov_base
= ((u8
*) &xprt
->tcp_xid
) + xprt
->tcp_copied
;
757 result
= xprt_recvmsg(xprt
, &riov
, 1, want
, 0);
760 xprt
->tcp_copied
+= result
;
761 xprt
->tcp_offset
+= result
;
770 * TCP read and complete request
773 tcp_read_request(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int avail
)
777 if (req
->rq_rlen
<= xprt
->tcp_copied
|| !avail
)
779 want
= MIN(req
->rq_rlen
- xprt
->tcp_copied
, avail
);
781 dprintk("RPC: %4d TCP receiving %d bytes\n",
782 req
->rq_task
->tk_pid
, want
);
784 result
= xprt_recvmsg(xprt
, req
->rq_rvec
, req
->rq_rnr
, want
, xprt
->tcp_copied
);
787 xprt
->tcp_copied
+= result
;
788 xprt
->tcp_offset
+= result
;
794 if (req
->rq_rlen
> xprt
->tcp_copied
&& xprt
->tcp_more
)
796 dprintk("RPC: %4d received reply complete\n", req
->rq_task
->tk_pid
);
797 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
803 * TCP discard extra bytes from a short read
806 tcp_read_discard(struct rpc_xprt
*xprt
, int avail
)
810 int want
, result
= 0;
813 want
= MIN(avail
, sizeof(dummy
));
814 riov
.iov_base
= dummy
;
816 dprintk("RPC: TCP skipping %d bytes\n", want
);
817 result
= xprt_recvmsg(xprt
, &riov
, 1, want
, 0);
820 xprt
->tcp_offset
+= result
;
827 * TCP record receive routine
828 * This is not the most efficient code since we call recvfrom thrice--
829 * first receiving the record marker, then the XID, then the data.
831 * The optimal solution would be a RPC support in the TCP layer, which
832 * would gather all data up to the next record marker and then pass us
833 * the list of all TCP segments ready to be copied.
836 tcp_input_record(struct rpc_xprt
*xprt
)
838 struct rpc_rqst
*req
= NULL
;
839 struct rpc_task
*task
= NULL
;
842 dprintk("RPC: tcp_input_record\n");
846 if (!xprt_connected(xprt
))
849 /* Read in a new fragment marker if necessary */
850 /* Can we ever really expect to get completely empty fragments? */
851 if ((result
= tcp_read_fraghdr(xprt
)) <= 0)
855 /* Read in the xid if necessary */
856 if ((result
= tcp_read_xid(xprt
, avail
)) <= 0)
860 /* Find and lock the request corresponding to this xid */
861 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
864 /* Read in the request data */
865 result
= tcp_read_request(xprt
, req
, avail
);
866 rpc_unlock_task(task
);
872 /* Skip over any trailing bytes on short reads */
873 if ((result
= tcp_read_discard(xprt
, avail
)) < 0)
876 dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
877 xprt
->tcp_offset
, xprt
->tcp_reclen
, xprt
->tcp_copied
);
878 result
= xprt
->tcp_reclen
;
883 * TCP task queue stuff
885 LIST_HEAD(rpc_xprt_pending
); /* List of xprts having pending tcp requests */
888 void tcp_rpciod_queue(void)
894 void xprt_append_pending(struct rpc_xprt
*xprt
)
896 if (!list_empty(&xprt
->rx_pending
))
898 spin_lock_bh(&rpc_queue_lock
);
899 if (list_empty(&xprt
->rx_pending
)) {
900 list_add(&xprt
->rx_pending
, rpc_xprt_pending
.prev
);
901 dprintk("RPC: xprt queue %p\n", xprt
);
904 spin_unlock_bh(&rpc_queue_lock
);
908 void xprt_remove_pending(struct rpc_xprt
*xprt
)
910 spin_lock_bh(&rpc_queue_lock
);
911 if (!list_empty(&xprt
->rx_pending
)) {
912 list_del(&xprt
->rx_pending
);
913 INIT_LIST_HEAD(&xprt
->rx_pending
);
915 spin_unlock_bh(&rpc_queue_lock
);
919 struct rpc_xprt
*xprt_remove_pending_next(void)
921 struct rpc_xprt
*xprt
= NULL
;
923 spin_lock_bh(&rpc_queue_lock
);
924 if (!list_empty(&rpc_xprt_pending
)) {
925 xprt
= list_entry(rpc_xprt_pending
.next
, struct rpc_xprt
, rx_pending
);
926 list_del(&xprt
->rx_pending
);
927 INIT_LIST_HEAD(&xprt
->rx_pending
);
929 spin_unlock_bh(&rpc_queue_lock
);
934 * This is protected from tcp_data_ready and the stack as its run
935 * inside of the RPC I/O daemon
938 __rpciod_tcp_dispatcher(void)
940 struct rpc_xprt
*xprt
;
941 int safe_retry
= 0, result
;
943 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
946 * Empty each pending socket
948 while ((xprt
= xprt_remove_pending_next()) != NULL
) {
949 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt
);
952 result
= tcp_input_record(xprt
);
953 } while (result
>= 0);
955 if (safe_retry
++ > 200) {
963 * data_ready callback for TCP. We can't just jump into the
964 * tcp recvmsg functions inside of the network receive bh or
965 * bad things occur. We queue it to pick up after networking
969 static void tcp_data_ready(struct sock
*sk
, int len
)
971 struct rpc_xprt
*xprt
;
973 dprintk("RPC: tcp_data_ready...\n");
974 if (!(xprt
= xprt_from_sock(sk
)))
976 printk("Not a socket with xprt %p\n", sk
);
983 xprt_append_pending(xprt
);
985 dprintk("RPC: tcp_data_ready client %p\n", xprt
);
986 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
987 sk
->state
, xprt_connected(xprt
),
988 sk
->dead
, sk
->zapped
);
990 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
991 wake_up_interruptible(sk
->sleep
);
996 tcp_state_change(struct sock
*sk
)
998 struct rpc_xprt
*xprt
;
1000 if (!(xprt
= xprt_from_sock(sk
)))
1002 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
1003 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1004 sk
->state
, xprt_connected(xprt
),
1005 sk
->dead
, sk
->zapped
);
1007 switch (sk
->state
) {
1008 case TCP_ESTABLISHED
:
1009 if (xprt_test_and_set_connected(xprt
))
1011 spin_lock_bh(&xprt_sock_lock
);
1012 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->sending
)
1013 rpc_wake_up_task(xprt
->snd_task
);
1014 rpc_wake_up(&xprt
->reconn
);
1015 spin_unlock_bh(&xprt_sock_lock
);
1021 xprt_disconnect(xprt
);
1025 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
1026 wake_up_interruptible_all(sk
->sleep
);
1030 * The following 2 routines allow a task to sleep while socket memory is
1034 tcp_write_space(struct sock
*sk
)
1036 struct rpc_xprt
*xprt
;
1037 struct socket
*sock
;
1039 if (!(xprt
= xprt_from_sock(sk
)) || !(sock
= sk
->socket
))
1044 /* Wait until we have enough socket memory */
1045 if (!sock_writeable(sk
))
1048 if (!xprt_test_and_set_wspace(xprt
)) {
1049 spin_lock_bh(&xprt_sock_lock
);
1050 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->sending
)
1051 rpc_wake_up_task(xprt
->snd_task
);
1052 spin_unlock_bh(&xprt_sock_lock
);
1055 if (test_bit(SOCK_NOSPACE
, &sock
->flags
)) {
1056 if (sk
->sleep
&& waitqueue_active(sk
->sleep
)) {
1057 clear_bit(SOCK_NOSPACE
, &sock
->flags
);
1058 wake_up_interruptible(sk
->sleep
);
1064 udp_write_space(struct sock
*sk
)
1066 struct rpc_xprt
*xprt
;
1068 if (!(xprt
= xprt_from_sock(sk
)))
1074 /* Wait until we have enough socket memory */
1075 if (sock_wspace(sk
) < min(sk
->sndbuf
,XPRT_MIN_WRITE_SPACE
))
1078 if (!xprt_test_and_set_wspace(xprt
)) {
1079 spin_lock_bh(&xprt_sock_lock
);
1080 if (xprt
->snd_task
&& xprt
->snd_task
->tk_rpcwait
== &xprt
->sending
)
1081 rpc_wake_up_task(xprt
->snd_task
);
1082 spin_unlock_bh(&xprt_sock_lock
);
1085 if (sk
->sleep
&& waitqueue_active(sk
->sleep
))
1086 wake_up_interruptible(sk
->sleep
);
1090 * RPC receive timeout handler.
1093 xprt_timer(struct rpc_task
*task
)
1095 struct rpc_rqst
*req
= task
->tk_rqstp
;
1098 xprt_adjust_cwnd(task
->tk_xprt
, -ETIMEDOUT
);
1100 dprintk("RPC: %4d xprt_timer (%s request)\n",
1101 task
->tk_pid
, req
? "pending" : "backlogged");
1103 task
->tk_status
= -ETIMEDOUT
;
1104 task
->tk_timeout
= 0;
1105 rpc_wake_up_task(task
);
1110 * Serialize access to sockets, in order to prevent different
1111 * requests from interfering with each other.
1114 xprt_down_transmit(struct rpc_task
*task
)
1116 struct rpc_xprt
*xprt
= task
->tk_rqstp
->rq_xprt
;
1117 struct rpc_rqst
*req
= task
->tk_rqstp
;
1119 spin_lock(&xprt_lock
);
1120 if (xprt
->snd_task
&& xprt
->snd_task
!= task
) {
1121 dprintk("RPC: %4d TCP write queue full (task %d)\n",
1122 task
->tk_pid
, xprt
->snd_task
->tk_pid
);
1123 task
->tk_timeout
= 0;
1124 task
->tk_status
= -EAGAIN
;
1125 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1126 } else if (!xprt
->snd_task
) {
1127 xprt
->snd_task
= task
;
1129 req
->rq_xtime
= jiffies
;
1131 req
->rq_bytes_sent
= 0;
1133 spin_unlock(&xprt_lock
);
1134 return xprt
->snd_task
== task
;
1138 * Releases the socket for use by other requests.
1141 xprt_up_transmit(struct rpc_task
*task
)
1143 struct rpc_xprt
*xprt
= task
->tk_rqstp
->rq_xprt
;
1145 if (xprt
->snd_task
&& xprt
->snd_task
== task
) {
1146 spin_lock(&xprt_lock
);
1147 xprt
->snd_task
= NULL
;
1148 rpc_wake_up_next(&xprt
->sending
);
1149 spin_unlock(&xprt_lock
);
1154 * Place the actual RPC call.
1155 * We have to copy the iovec because sendmsg fiddles with its contents.
1158 xprt_transmit(struct rpc_task
*task
)
1160 struct rpc_rqst
*req
= task
->tk_rqstp
;
1161 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1163 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
1164 *(u32
*)(req
->rq_svec
[0].iov_base
));
1167 task
->tk_status
= -EIO
;
1169 if (!xprt_connected(xprt
))
1170 task
->tk_status
= -ENOTCONN
;
1172 if (task
->tk_status
< 0)
1175 if (task
->tk_rpcwait
)
1176 rpc_remove_wait_queue(task
);
1178 /* set up everything as needed. */
1179 /* Write the record marker */
1181 u32
*marker
= req
->rq_svec
[0].iov_base
;
1183 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1186 if (!xprt_down_transmit(task
))
1189 do_xprt_transmit(task
);
1193 do_xprt_transmit(struct rpc_task
*task
)
1195 struct rpc_rqst
*req
= task
->tk_rqstp
;
1196 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1197 int status
, retry
= 0;
1200 /* For fast networks/servers we have to put the request on
1201 * the pending list now:
1202 * Note that we don't want the task timing out during the
1203 * call to xprt_sendmsg(), so we initially disable the timeout,
1204 * and then reset it later...
1208 /* Continue transmitting the packet/record. We must be careful
1209 * to cope with writespace callbacks arriving _after_ we have
1210 * called xprt_sendmsg().
1213 xprt_clear_wspace(xprt
);
1214 status
= xprt_sendmsg(xprt
, req
);
1220 req
->rq_bytes_sent
+= status
;
1222 if (req
->rq_bytes_sent
>= req
->rq_slen
)
1225 if (status
>= req
->rq_slen
)
1231 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1232 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1239 rpc_unlock_task(task
);
1241 /* Note: at this point, task->tk_sleeping has not yet been set,
1242 * hence there is no danger of the waking up task being put on
1243 * schedq, and being picked up by a parallel run of rpciod().
1245 rpc_wake_up_task(task
);
1246 if (!RPC_IS_RUNNING(task
))
1248 if (req
->rq_received
)
1251 task
->tk_status
= status
;
1255 /* Protect against (udp|tcp)_write_space */
1256 spin_lock_bh(&xprt_sock_lock
);
1257 if (!xprt_wspace(xprt
)) {
1258 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1259 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1261 spin_unlock_bh(&xprt_sock_lock
);
1264 /* Keep holding the socket if it is blocked */
1265 rpc_delay(task
, HZ
>>4);
1276 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1277 /* Set the task's receive timeout value */
1278 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1279 rpc_add_timer(task
, xprt_timer
);
1280 rpc_unlock_task(task
);
1282 xprt_up_transmit(task
);
1286 * Queue the task for a reply to our call.
1287 * When the callback is invoked, the congestion window should have
1288 * been updated already.
1291 xprt_receive(struct rpc_task
*task
)
1293 struct rpc_rqst
*req
= task
->tk_rqstp
;
1294 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1296 dprintk("RPC: %4d xprt_receive\n", task
->tk_pid
);
1298 req
->rq_received
= 0;
1299 task
->tk_timeout
= 0;
1300 rpc_sleep_locked(&xprt
->pending
, task
, NULL
, NULL
);
1304 * Reserve an RPC call slot.
1307 xprt_reserve(struct rpc_task
*task
)
1309 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1311 /* We already have an initialized request. */
1315 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1316 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1317 spin_lock_bh(&xprt_sock_lock
);
1318 xprt_reserve_status(task
);
1319 if (task
->tk_rqstp
) {
1320 task
->tk_timeout
= 0;
1321 } else if (!task
->tk_timeout
) {
1322 task
->tk_status
= -ENOBUFS
;
1324 dprintk("RPC: xprt_reserve waiting on backlog\n");
1325 task
->tk_status
= -EAGAIN
;
1326 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1328 spin_unlock_bh(&xprt_sock_lock
);
1329 dprintk("RPC: %4d xprt_reserve returns %d\n",
1330 task
->tk_pid
, task
->tk_status
);
1331 return task
->tk_status
;
1335 * Reservation callback
1338 xprt_reserve_status(struct rpc_task
*task
)
1340 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1341 struct rpc_rqst
*req
;
1343 if (xprt
->shutdown
) {
1344 task
->tk_status
= -EIO
;
1345 } else if (task
->tk_status
< 0) {
1347 } else if (task
->tk_rqstp
) {
1348 /* We've already been given a request slot: NOP */
1350 if (RPCXPRT_CONGESTED(xprt
) || !(req
= xprt
->free
))
1352 /* OK: There's room for us. Grab a free slot and bump
1353 * congestion value */
1354 xprt
->free
= req
->rq_next
;
1355 req
->rq_next
= NULL
;
1356 xprt
->cong
+= RPC_CWNDSCALE
;
1357 task
->tk_rqstp
= req
;
1358 xprt_request_init(task
, xprt
);
1361 xprt_clear_backlog(xprt
);
1367 task
->tk_status
= -EAGAIN
;
1371 * Initialize RPC request
1374 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1376 struct rpc_rqst
*req
= task
->tk_rqstp
;
1380 xid
= CURRENT_TIME
<< 12;
1382 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1383 task
->tk_status
= 0;
1384 req
->rq_timeout
= xprt
->timeout
;
1385 req
->rq_task
= task
;
1386 req
->rq_xprt
= xprt
;
1387 req
->rq_xid
= xid
++;
1393 * Release an RPC call slot
1396 xprt_release(struct rpc_task
*task
)
1398 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1399 struct rpc_rqst
*req
;
1401 xprt_up_transmit(task
);
1402 if (!(req
= task
->tk_rqstp
))
1404 task
->tk_rqstp
= NULL
;
1405 memset(req
, 0, sizeof(*req
)); /* mark unused */
1407 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1409 /* remove slot from queue of pending */
1410 if (task
->tk_rpcwait
) {
1411 printk("RPC: task of released request still queued!\n");
1412 rpc_remove_wait_queue(task
);
1415 spin_lock_bh(&xprt_sock_lock
);
1416 req
->rq_next
= xprt
->free
;
1419 /* Decrease congestion value. */
1420 xprt
->cong
-= RPC_CWNDSCALE
;
1422 xprt_clear_backlog(xprt
);
1423 spin_unlock_bh(&xprt_sock_lock
);
1427 * Set default timeout parameters
1430 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1432 if (proto
== IPPROTO_UDP
)
1433 xprt_set_timeout(to
, 5, 5 * HZ
);
1435 xprt_set_timeout(to
, 5, 60 * HZ
);
1439 * Set constant timeout
1442 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1446 to
->to_increment
= incr
;
1447 to
->to_maxval
= incr
* retr
;
1448 to
->to_resrvval
= incr
* retr
;
1449 to
->to_retries
= retr
;
1450 to
->to_exponential
= 0;
1454 * Initialize an RPC client
1456 static struct rpc_xprt
*
1457 xprt_setup(struct socket
*sock
, int proto
,
1458 struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1460 struct rpc_xprt
*xprt
;
1461 struct rpc_rqst
*req
;
1464 dprintk("RPC: setting up %s transport...\n",
1465 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1467 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1469 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1473 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1475 xprt
->cwnd
= RPC_MAXCWND
;
1478 xprt
->cwnd
= RPC_INITCWND
;
1479 xprt
->congtime
= jiffies
;
1480 init_waitqueue_head(&xprt
->cong_wait
);
1482 /* Set timeout parameters */
1484 xprt
->timeout
= *to
;
1485 xprt
->timeout
.to_current
= to
->to_initval
;
1486 xprt
->timeout
.to_resrvval
= to
->to_maxval
<< 1;
1488 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1490 xprt
->pending
= RPC_INIT_WAITQ("xprt_pending");
1491 xprt
->sending
= RPC_INIT_WAITQ("xprt_sending");
1492 xprt
->backlog
= RPC_INIT_WAITQ("xprt_backlog");
1493 xprt
->reconn
= RPC_INIT_WAITQ("xprt_reconn");
1495 /* initialize free list */
1496 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1497 req
->rq_next
= req
+ 1;
1498 req
->rq_next
= NULL
;
1499 xprt
->free
= xprt
->slot
;
1501 INIT_LIST_HEAD(&xprt
->rx_pending
);
1503 dprintk("RPC: created transport %p\n", xprt
);
1505 xprt_bind_socket(xprt
, sock
);
1510 * Bind to a reserved port
1513 xprt_bindresvport(struct socket
*sock
)
1515 struct sockaddr_in myaddr
;
1518 memset(&myaddr
, 0, sizeof(myaddr
));
1519 myaddr
.sin_family
= AF_INET
;
1522 myaddr
.sin_port
= htons(port
);
1523 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1525 } while (err
== -EADDRINUSE
&& --port
> 0);
1528 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1534 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1536 struct sock
*sk
= sock
->sk
;
1541 sk
->user_data
= xprt
;
1542 xprt
->old_data_ready
= sk
->data_ready
;
1543 xprt
->old_state_change
= sk
->state_change
;
1544 xprt
->old_write_space
= sk
->write_space
;
1545 if (xprt
->prot
== IPPROTO_UDP
) {
1546 sk
->data_ready
= udp_data_ready
;
1547 sk
->write_space
= udp_write_space
;
1548 sk
->no_check
= UDP_CSUM_NORCV
;
1549 xprt_set_connected(xprt
);
1551 sk
->data_ready
= tcp_data_ready
;
1552 sk
->state_change
= tcp_state_change
;
1553 sk
->write_space
= tcp_write_space
;
1554 xprt_clear_connected(xprt
);
1557 /* Reset to new socket */
1561 * TCP requires the rpc I/O daemon is present
1570 * Create a client socket given the protocol and peer address.
1572 static struct socket
*
1573 xprt_create_socket(int proto
, struct rpc_timeout
*to
)
1575 struct socket
*sock
;
1578 dprintk("RPC: xprt_create_socket(%s %d)\n",
1579 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1581 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1583 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1584 printk("RPC: can't create socket (%d).\n", -err
);
1588 /* If the caller has the capability, bind to a reserved port */
1589 if (capable(CAP_NET_BIND_SERVICE
) && xprt_bindresvport(sock
) < 0)
1600 * Create an RPC client transport given the protocol and peer address.
1603 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1605 struct socket
*sock
;
1606 struct rpc_xprt
*xprt
;
1608 dprintk("RPC: xprt_create_proto called\n");
1610 if (!(sock
= xprt_create_socket(proto
, to
)))
1613 if (!(xprt
= xprt_setup(sock
, proto
, sap
, to
)))
1620 * Prepare for transport shutdown.
1623 xprt_shutdown(struct rpc_xprt
*xprt
)
1626 rpc_wake_up(&xprt
->sending
);
1627 rpc_wake_up(&xprt
->pending
);
1628 rpc_wake_up(&xprt
->backlog
);
1629 rpc_wake_up(&xprt
->reconn
);
1630 if (waitqueue_active(&xprt
->cong_wait
))
1631 wake_up(&xprt
->cong_wait
);
1635 * Clear the xprt backlog queue
1638 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1639 if (RPCXPRT_CONGESTED(xprt
))
1641 rpc_wake_up_next(&xprt
->backlog
);
1642 if (waitqueue_active(&xprt
->cong_wait
))
1643 wake_up(&xprt
->cong_wait
);
1648 * Destroy an RPC transport, killing off all requests.
1651 xprt_destroy(struct rpc_xprt
*xprt
)
1653 dprintk("RPC: destroying transport %p\n", xprt
);
1654 xprt_shutdown(xprt
);