2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
39 #define __KERNEL_SYSCALLS__
41 #include <linux/version.h>
42 #include <linux/types.h>
43 #include <linux/malloc.h>
44 #include <linux/sched.h>
45 #include <linux/errno.h>
46 #include <linux/socket.h>
48 #include <linux/net.h>
50 #include <linux/udp.h>
51 #include <linux/unistd.h>
52 #include <linux/sunrpc/clnt.h>
53 #include <linux/file.h>
57 #include <asm/uaccess.h>
59 #define SOCK_HAS_USER_DATA
64 #ifndef SOCK_HAS_USER_DATA
65 static struct rpc_xprt
* sock_list
= NULL
;
69 # undef RPC_DEBUG_DATA
70 # define RPCDBG_FACILITY RPCDBG_XPRT
74 # define MAX(a, b) ((a) > (b)? (a) : (b))
75 # define MIN(a, b) ((a) < (b)? (a) : (b))
81 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
82 static void xprt_transmit_status(struct rpc_task
*task
);
83 static void xprt_receive_status(struct rpc_task
*task
);
84 static void xprt_reserve_status(struct rpc_task
*task
);
85 static void xprt_reconn_timeout(struct rpc_task
*task
);
86 static void xprt_reconn_status(struct rpc_task
*task
);
87 static struct socket
*xprt_create_socket(int, struct sockaddr_in
*,
88 struct rpc_timeout
*);
92 * Print the buffer contents (first 128 bytes only--just enough for
96 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
98 u8
*buf
= (u8
*) packet
;
101 dprintk("RPC: %s\n", msg
);
102 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
106 dprintk("0x%04x ", j
);
108 dprintk("%02x%02x%02x%02x ",
109 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
115 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
122 * Look up RPC transport given an INET socket
124 static inline struct rpc_xprt
*
125 xprt_from_sock(struct sock
*sk
)
127 #ifndef SOCK_HAS_USER_DATA
128 struct rpc_xprt
*xprt
;
130 for (xprt
= sock_list
; xprt
&& sk
!= xprt
->inet
; xprt
= xprt
->link
)
134 return (struct rpc_xprt
*) sk
->user_data
;
139 * Write data to socket.
142 xprt_sendmsg(struct rpc_xprt
*xprt
)
144 struct socket
*sock
= xprt
->sock
;
149 xprt_pktdump("packet data:",
150 xprt
->snd_buf
.io_vec
->iov_base
,
151 xprt
->snd_buf
.io_vec
->iov_len
);
153 #if LINUX_VERSION_CODE >= 0x020100
154 msg
.msg_flags
= MSG_DONTWAIT
;
155 msg
.msg_iov
= xprt
->snd_buf
.io_vec
;
156 msg
.msg_iovlen
= xprt
->snd_buf
.io_nr
;
157 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
158 msg
.msg_namelen
= sizeof(xprt
->addr
);
159 msg
.msg_control
= NULL
;
161 oldfs
= get_fs(); set_fs(get_ds());
162 result
= sock_sendmsg(sock
, &msg
, xprt
->snd_buf
.io_len
);
166 msg
.msg_iov
= xprt
->snd_buf
.io_vec
;
167 msg
.msg_iovlen
= xprt
->snd_buf
.io_nr
;
168 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
169 msg
.msg_namelen
= sizeof(xprt
->addr
);
170 msg
.msg_control
= NULL
;
172 oldfs
= get_fs(); set_fs(get_ds());
173 result
= sock
->ops
->sendmsg(sock
, &msg
, xprt
->snd_buf
.io_len
, 1, 0);
177 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
178 xprt
->snd_buf
.io_len
, result
);
181 xprt
->snd_buf
.io_len
-= result
;
187 /* When the server has died, an ICMP port unreachable message
188 * prompts ECONNREFUSED.
191 case -ENOTCONN
: case -EPIPE
:
192 /* connection broken */
195 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
202 * Read data from socket
205 xprt_recvmsg(struct rpc_xprt
*xprt
, struct iovec
*iov
, int nr
, int len
)
207 struct socket
*sock
= xprt
->sock
;
208 struct sockaddr_in sin
;
213 #if LINUX_VERSION_CODE >= 0x020100
214 msg
.msg_flags
= MSG_DONTWAIT
;
218 msg
.msg_namelen
= sizeof(sin
);
219 msg
.msg_control
= NULL
;
221 oldfs
= get_fs(); set_fs(get_ds());
222 result
= sock_recvmsg(sock
, &msg
, len
, MSG_DONTWAIT
);
225 int alen
= sizeof(sin
);
230 msg
.msg_namelen
= sizeof(sin
);
231 msg
.msg_control
= NULL
;
233 oldfs
= get_fs(); set_fs(get_ds());
234 result
= sock
->ops
->recvmsg(sock
, &msg
, len
, 1, 0, &alen
);
241 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
248 * Adjust RPC congestion window
249 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
252 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
254 unsigned long cwnd
= xprt
->cwnd
;
259 if (xprt
->cong
< cwnd
|| jiffies
< xprt
->congtime
)
261 /* The (cwnd >> 1) term makes sure
262 * the result gets rounded properly. */
263 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
264 if (cwnd
> RPC_MAXCWND
)
267 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
268 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 2) / RPC_CWNDSCALE
;
269 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
270 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
271 (xprt
->congtime
-jiffies
)*1000/HZ
);
272 } else if (result
== -ETIMEDOUT
) {
273 if ((cwnd
>>= 1) < RPC_CWNDSCALE
)
274 cwnd
= RPC_CWNDSCALE
;
275 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 3) / RPC_CWNDSCALE
;
276 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
277 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
278 (xprt
->congtime
-jiffies
)*1000/HZ
);
279 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
286 * Adjust timeout values etc for next retransmit
289 xprt_adjust_timeout(struct rpc_timeout
*to
)
291 if (to
->to_exponential
)
292 to
->to_current
<<= 1;
294 to
->to_current
+= to
->to_increment
;
295 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
) {
296 to
->to_current
= to
->to_maxval
;
299 if (!to
->to_current
) {
300 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
301 to
->to_current
= 5 * HZ
;
303 pprintk("RPC: %lu %s\n", jiffies
,
304 to
->to_retries
? "retrans" : "timeout");
305 return (to
->to_retries
)--;
309 * Close down a transport socket
312 xprt_close(struct rpc_xprt
*xprt
)
314 struct sock
*sk
= xprt
->inet
;
316 #ifdef SOCK_HAS_USER_DATA
317 sk
->user_data
= NULL
;
319 sk
->data_ready
= xprt
->old_data_ready
;
320 sk
->state_change
= xprt
->old_state_change
;
321 sk
->write_space
= xprt
->old_write_space
;
326 sock_release(xprt
->sock
);
328 * TCP doesnt require the rpciod now - other things may
329 * but rpciod handles that not us.
336 * Mark a transport as disconnected
339 xprt_disconnect(struct rpc_xprt
*xprt
)
341 dprintk("RPC: disconnected transport %p\n", xprt
);
342 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
343 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
348 * Reconnect a broken TCP connection.
351 xprt_reconnect(struct rpc_task
*task
)
353 struct rpc_xprt
*xprt
= task
->tk_xprt
;
358 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
359 task
->tk_pid
, xprt
, xprt
->connected
);
362 if (xprt
->connecting
) {
363 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
364 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
367 xprt
->connecting
= 1;
369 /* Create an unconnected socket */
370 if (!(sock
= xprt_create_socket(xprt
->prot
, NULL
, &xprt
->timeout
)))
373 #if LINUX_VERSION_CODE >= 0x020100
376 inet
= (struct sock
*) sock
->data
;
378 inet
->data_ready
= xprt
->inet
->data_ready
;
379 inet
->state_change
= xprt
->inet
->state_change
;
380 inet
->write_space
= xprt
->inet
->write_space
;
381 #ifdef SOCK_HAS_USER_DATA
382 inet
->user_data
= xprt
;
385 dprintk("RPC: %4d closing old socket\n", task
->tk_pid
);
386 xprt_disconnect(xprt
);
389 /* Reset to new socket and default congestion */
392 xprt
->cwnd
= RPC_INITCWND
;
394 /* Now connect it asynchronously. */
395 dprintk("RPC: %4d connecting new socket\n", task
->tk_pid
);
396 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
397 sizeof(xprt
->addr
), O_NONBLOCK
);
399 if (status
!= -EINPROGRESS
&& status
!= -EALREADY
) {
400 printk("RPC: TCP connect error %d!\n", -status
);
404 dprintk("RPC: %4d connect status %d connected %d\n",
405 task
->tk_pid
, status
, xprt
->connected
);
406 task
->tk_timeout
= 60 * HZ
;
409 if (!xprt
->connected
) {
410 rpc_sleep_on(&xprt
->reconn
, task
,
411 xprt_reconn_status
, xprt_reconn_timeout
);
418 xprt
->connecting
= 0;
419 rpc_wake_up(&xprt
->reconn
);
423 task
->tk_timeout
= 30 * HZ
;
424 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
425 xprt
->connecting
= 0;
432 xprt_reconn_status(struct rpc_task
*task
)
434 struct rpc_xprt
*xprt
= task
->tk_xprt
;
436 dprintk("RPC: %4d xprt_reconn_status %d\n",
437 task
->tk_pid
, task
->tk_status
);
438 if (!xprt
->connected
&& task
->tk_status
!= -ETIMEDOUT
) {
439 task
->tk_timeout
= 30 * HZ
;
440 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, xprt_reconn_timeout
);
445 * Reconnect timeout. We just mark the transport as not being in the
446 * process of reconnecting, and leave the rest to the upper layers.
449 xprt_reconn_timeout(struct rpc_task
*task
)
451 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
452 task
->tk_pid
, task
->tk_status
);
453 task
->tk_status
= -ENOTCONN
;
454 task
->tk_xprt
->connecting
= 0;
455 task
->tk_timeout
= 0;
456 rpc_wake_up_task(task
);
460 * Look up the RPC request corresponding to a reply.
462 static inline struct rpc_rqst
*
463 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
465 struct rpc_task
*head
, *task
;
466 struct rpc_rqst
*req
;
469 if ((head
= xprt
->pending
.task
) != NULL
) {
472 if ((req
= task
->tk_rqstp
) && req
->rq_xid
== xid
)
474 task
= task
->tk_next
;
476 printk("xprt_lookup_rqst: loop in Q!\n");
479 } while (task
!= head
);
481 dprintk("RPC: unknown XID %08x in reply.\n", xid
);
486 * Complete reply received.
487 * The TCP code relies on us to remove the request from xprt->pending.
490 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
492 struct rpc_task
*task
= req
->rq_task
;
494 req
->rq_rlen
= copied
;
497 /* Adjust congestion window */
498 xprt_adjust_cwnd(xprt
, copied
);
501 /* Profile only reads for now */
503 static unsigned long nextstat
= 0;
504 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
507 pkt_len
+= req
->rq_slen
+ copied
;
508 pkt_rtt
+= jiffies
- req
->rq_xtime
;
509 if (nextstat
< jiffies
) {
510 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
511 printk("RPC: %ld %ld %ld %ld stat\n",
512 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
513 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
514 nextstat
= jiffies
+ 5 * HZ
;
519 /* ... and wake up the process. */
520 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
521 task
->tk_status
= copied
;
523 rpc_wake_up_task(task
);
528 * Input handler for RPC replies. Called from a bottom half and hence
532 udp_data_ready(struct sock
*sk
, int len
)
534 struct rpc_task
*task
;
535 struct rpc_xprt
*xprt
;
536 struct rpc_rqst
*rovr
;
538 struct iovec iov
[MAX_IOVEC
];
540 int err
, repsize
, copied
;
542 dprintk("RPC: udp_data_ready...\n");
543 if (!(xprt
= xprt_from_sock(sk
)))
545 dprintk("RPC: udp_data_ready client %p\n", xprt
);
547 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
549 repsize
= skb
->len
- 8; /* don't account for UDP header */
552 printk("RPC: impossible RPC reply size %d!\n", repsize
);
556 /* Look up the request corresponding to the given XID */
557 if (!(rovr
= xprt_lookup_rqst(xprt
, *(u32
*) (skb
->h
.raw
+ 8))))
559 task
= rovr
->rq_task
;
561 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
562 xprt_pktdump("packet data:", (u32
*) (skb
->h
.raw
+8), repsize
);
564 if ((copied
= rovr
->rq_rlen
) > repsize
)
567 /* Okay, we have it. Copy datagram... */
568 memcpy(iov
, rovr
->rq_rvec
, rovr
->rq_rnr
* sizeof(iov
[0]));
569 oldfs
= get_fs(); set_fs(get_ds());
570 skb_copy_datagram_iovec(skb
, 8, iov
, copied
);
573 xprt_complete_rqst(xprt
, rovr
, copied
);
576 skb_free_datagram(sk
, skb
);
581 * TCP record receive routine
582 * This is not the most efficient code since we call recvfrom twice--
583 * first receiving the record marker and XID, then the data.
585 * The optimal solution would be a RPC support in the TCP layer, which
586 * would gather all data up to the next record marker and then pass us
587 * the list of all TCP segments ready to be copied.
590 tcp_input_record(struct rpc_xprt
*xprt
)
592 struct rpc_rqst
*req
;
596 int result
, maxcpy
, reclen
, avail
, want
;
598 dprintk("RPC: tcp_input_record\n");
599 offset
= xprt
->tcp_offset
;
601 if (offset
< 4 || (!xprt
->tcp_more
&& offset
< 8)) {
602 want
= (xprt
->tcp_more
? 4 : 8) - offset
;
603 dprintk("RPC: reading header (%d bytes)\n", want
);
604 riov
.iov_base
= xprt
->tcp_recm
.data
+ offset
;
606 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
615 /* Get the record length and mask out the more_fragments bit */
616 reclen
= ntohl(xprt
->tcp_reclen
);
617 dprintk("RPC: reclen %08x\n", reclen
);
618 xprt
->tcp_more
= (reclen
& 0x80000000)? 0 : 1;
619 if (!(reclen
&= 0x7fffffff)) {
620 printk(KERN_NOTICE
"RPC: empty TCP record.\n");
621 return -ENOTCONN
; /* break connection */
623 xprt
->tcp_total
+= reclen
;
624 xprt
->tcp_reclen
= reclen
;
626 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
627 xprt
->tcp_xid
, xprt
->tcp_reclen
, xprt
->tcp_more
);
628 if (!xprt
->tcp_copied
629 && (req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
))) {
630 iov
= xprt
->tcp_iovec
;
631 memcpy(iov
, req
->rq_rvec
, req
->rq_rnr
* sizeof(iov
[0]));
633 *(u32
*)iov
->iov_base
= req
->rq_xid
;
637 xprt
->tcp_copied
= 4;
638 xprt
->tcp_rqstp
= req
;
641 reclen
= xprt
->tcp_reclen
;
644 avail
= reclen
- (offset
- 4);
645 if ((req
= xprt
->tcp_rqstp
) && req
->rq_xid
== xprt
->tcp_xid
646 && req
->rq_task
->tk_rpcwait
== &xprt
->pending
) {
647 want
= MIN(req
->rq_rlen
- xprt
->tcp_copied
, avail
);
649 dprintk("RPC: %4d TCP receiving %d bytes\n",
650 req
->rq_task
->tk_pid
, want
);
651 result
= xprt_recvmsg(xprt
, xprt
->tcp_iovec
, req
->rq_rnr
, want
);
654 xprt
->tcp_copied
+= result
;
662 maxcpy
= MIN(req
->rq_rlen
, xprt
->tcp_total
);
663 if (xprt
->tcp_copied
== maxcpy
&& !xprt
->tcp_more
) {
664 dprintk("RPC: %4d received reply complete\n",
665 req
->rq_task
->tk_pid
);
666 xprt_complete_rqst(xprt
, req
, xprt
->tcp_total
);
667 xprt
->tcp_copied
= 0;
668 xprt
->tcp_rqstp
= NULL
;
670 /* Request must be re-encoded before retransmit */
674 /* Skip over any trailing bytes on short reads */
678 want
= MIN(avail
, sizeof(dummy
));
679 riov
.iov_base
= dummy
;
681 dprintk("RPC: TCP skipping %d bytes\n", want
);
682 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
697 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
698 offset
, xprt
->tcp_total
, xprt
->tcp_copied
);
699 xprt
->tcp_offset
= offset
;
704 * TCP task queue stuff
707 static struct rpc_xprt
*rpc_xprt_pending
= NULL
; /* Chain by rx_pending of rpc_xprt's */
710 * This is protected from tcp_data_ready and the stack as its run
711 * inside of the RPC I/O daemon
714 void rpciod_tcp_dispatcher(void)
716 struct rpc_xprt
*xprt
;
719 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
722 * Empty each pending socket
725 while((xprt
=rpc_xprt_pending
)!=NULL
)
729 rpc_xprt_pending
=xprt
->rx_pending
;
730 xprt
->rx_pending_flag
=0;
732 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt
);
736 if (safe_retry
++ > 50)
738 result
= tcp_input_record(xprt
);
747 xprt_disconnect(xprt
);
750 printk(KERN_WARNING
"RPC: unexpected error %d from tcp_input_record\n",
757 extern inline void tcp_rpciod_queue(void)
763 * data_ready callback for TCP. We can't just jump into the
764 * tcp recvmsg functions inside of the network receive bh or
765 * bad things occur. We queue it to pick up after networking
769 static void tcp_data_ready(struct sock
*sk
, int len
)
771 struct rpc_xprt
*xprt
;
773 dprintk("RPC: tcp_data_ready...\n");
774 if (!(xprt
= xprt_from_sock(sk
)))
776 printk("Not a socket with xprt %p\n", sk
);
779 dprintk("RPC: tcp_data_ready client %p\n", xprt
);
780 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
781 sk
->state
, xprt
->connected
,
782 sk
->dead
, sk
->zapped
);
784 * If we are not waiting for the RPC bh run then
787 if (!xprt
->rx_pending_flag
)
789 dprintk("RPC: xprt queue\n");
790 if(rpc_xprt_pending
==NULL
)
792 xprt
->rx_pending_flag
=1;
793 xprt
->rx_pending
=rpc_xprt_pending
;
794 rpc_xprt_pending
=xprt
;
797 dprintk("RPC: xprt queued already %p\n", xprt
);
802 tcp_state_change(struct sock
*sk
)
804 struct rpc_xprt
*xprt
;
806 if (!(xprt
= xprt_from_sock(sk
)))
808 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
809 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
810 sk
->state
, xprt
->connected
,
811 sk
->dead
, sk
->zapped
);
813 if (sk
->state
== TCP_ESTABLISHED
&& !xprt
->connected
) {
815 xprt
->connecting
= 0;
816 rpc_wake_up(&xprt
->reconn
);
817 } else if (sk
->zapped
) {
818 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
819 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
820 rpc_wake_up_status(&xprt
->reconn
, -ENOTCONN
);
825 tcp_write_space(struct sock
*sk
)
827 struct rpc_xprt
*xprt
;
829 if (!(xprt
= xprt_from_sock(sk
)))
831 xprt
->write_space
= 1;
832 if (xprt
->snd_task
&& !RPC_IS_RUNNING(xprt
->snd_task
))
833 rpc_wake_up_task(xprt
->snd_task
);
837 * RPC receive timeout handler.
840 xprt_timer(struct rpc_task
*task
)
843 xprt_adjust_cwnd(task
->tk_xprt
, -ETIMEDOUT
);
845 dprintk("RPC: %4d xprt_timer (%s request)\n", task
->tk_pid
,
846 task
->tk_rqstp
? "pending" : "backlogged");
848 task
->tk_status
= -ETIMEDOUT
;
849 task
->tk_timeout
= 0;
850 rpc_wake_up_task(task
);
854 * (Partly) transmit the RPC packet
855 * Note that task->tk_status is either 0 or negative on return.
856 * Only when the reply is received will the status be set to a
860 xprt_transmit_some(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
862 struct rpc_rqst
*req
= task
->tk_rqstp
;
866 if ((result
= xprt_sendmsg(xprt
)) >= 0) {
867 if (!xprt
->snd_buf
.io_len
|| !xprt
->stream
) {
868 rpc_wake_up_next(&xprt
->sending
);
872 } else if (xprt
->stream
) {
873 if (result
== -ENOTCONN
|| result
== -EPIPE
) {
874 xprt_disconnect(xprt
);
878 return task
->tk_status
= result
;
882 * Place the actual RPC call.
883 * We have to copy the iovec because sendmsg fiddles with its contents.
886 xprt_transmit(struct rpc_task
*task
)
888 struct rpc_timeout
*timeo
;
889 struct rpc_rqst
*req
= task
->tk_rqstp
;
890 struct rpc_xprt
*xprt
= req
->rq_xprt
;
892 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
893 *(u32
*)(req
->rq_svec
[0].iov_base
));
895 if (xprt
->shutdown
) {
896 task
->tk_status
= -EIO
;
900 /* If we're not already in the process of transmitting our call,
901 * set up everything as needed. */
902 if (xprt
->snd_task
!= task
) {
903 /* Write the record marker */
907 if (!xprt
->connected
) {
908 task
->tk_status
= -ENOTCONN
;
911 marker
= htonl(0x80000000|(req
->rq_slen
-4));
912 *((u32
*) req
->rq_svec
[0].iov_base
) = marker
;
915 /* Reset timeout parameters */
916 timeo
= &req
->rq_timeout
;
917 if (timeo
->to_retries
< 0) {
918 dprintk("RPC: %4d xprt_transmit reset timeo\n",
920 timeo
->to_retries
= xprt
->timeout
.to_retries
;
921 timeo
->to_current
= timeo
->to_initval
;
925 req
->rq_xtime
= jiffies
;
929 if (xprt
->snd_task
) {
930 dprintk("RPC: %4d TCP write queue full (task %d)\n",
931 task
->tk_pid
, xprt
->snd_task
->tk_pid
);
932 rpc_sleep_on(&xprt
->sending
, task
,
933 xprt_transmit_status
, NULL
);
936 xprt
->snd_buf
= req
->rq_snd_buf
;
937 xprt
->snd_task
= task
;
940 /* For fast networks/servers we have to put the request on
941 * the pending list now:
944 rpc_add_wait_queue(&xprt
->pending
, task
);
945 task
->tk_callback
= NULL
;
948 /* Continue transmitting the packet/record. We must be careful
949 * to cope with writespace callbacks arriving _after_ we have
950 * called xprt_sendmsg().
953 xprt
->write_space
= 0;
954 if (xprt_transmit_some(xprt
, task
) != -EAGAIN
) {
955 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
956 xprt
->snd_task
= NULL
;
960 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
961 task
->tk_pid
, xprt
->snd_buf
.io_len
,
965 if (!xprt
->write_space
) {
966 /* Remove from pending */
967 rpc_remove_wait_queue(task
);
968 rpc_sleep_on(&xprt
->sending
, task
,
969 xprt_transmit_status
, NULL
);
978 * This callback is invoked when the sending task is forced to sleep
979 * because the TCP write buffers are full
982 xprt_transmit_status(struct rpc_task
*task
)
984 struct rpc_xprt
*xprt
= task
->tk_client
->cl_xprt
;
986 dprintk("RPC: %4d transmit_status %d\n", task
->tk_pid
, task
->tk_status
);
987 if (xprt
->snd_task
== task
) {
988 if (task
->tk_status
< 0)
989 xprt
->snd_task
= NULL
;
990 xprt_disconnect(xprt
);
995 * Wait for the reply to our call.
996 * When the callback is invoked, the congestion window should have
997 * been updated already.
1000 xprt_receive(struct rpc_task
*task
)
1002 struct rpc_rqst
*req
= task
->tk_rqstp
;
1003 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1005 dprintk("RPC: %4d xprt_receive\n", task
->tk_pid
);
1006 if (xprt
->connected
== 0) {
1007 task
->tk_status
= -ENOTCONN
;
1012 * Wait until rq_gotit goes non-null, or timeout elapsed.
1014 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1017 if (!req
->rq_gotit
) {
1018 rpc_sleep_on(&xprt
->pending
, task
,
1019 xprt_receive_status
, xprt_timer
);
1023 dprintk("RPC: %4d xprt_receive returns %d\n",
1024 task
->tk_pid
, task
->tk_status
);
1028 xprt_receive_status(struct rpc_task
*task
)
1030 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1032 if (xprt
->stream
&& xprt
->tcp_rqstp
== task
->tk_rqstp
)
1033 xprt
->tcp_rqstp
= NULL
;
1037 * Reserve an RPC call slot.
1040 xprt_reserve(struct rpc_task
*task
)
1042 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1044 /* We already have an initialized request. */
1048 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1049 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1050 if ((!RPCXPRT_CONGESTED(xprt
) && xprt
->free
)) {
1051 xprt_reserve_status(task
);
1052 task
->tk_timeout
= 0;
1053 } else if (!task
->tk_timeout
) {
1054 task
->tk_status
= -ENOBUFS
;
1056 dprintk("RPC: xprt_reserve waiting on backlog\n");
1057 rpc_sleep_on(&xprt
->backlog
, task
, xprt_reserve_status
, NULL
);
1059 dprintk("RPC: %4d xprt_reserve returns %d\n",
1060 task
->tk_pid
, task
->tk_status
);
1061 return task
->tk_status
;
1065 * Reservation callback
1068 xprt_reserve_status(struct rpc_task
*task
)
1070 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1071 struct rpc_rqst
*req
;
1073 if (xprt
->shutdown
) {
1074 task
->tk_status
= -EIO
;
1075 } else if (task
->tk_status
< 0) {
1077 } else if (task
->tk_rqstp
) {
1078 /* We've already been given a request slot: NOP */
1079 } else if (!RPCXPRT_CONGESTED(xprt
)) {
1080 /* OK: There's room for us. Grab a free slot and bump
1081 * congestion value */
1087 xprt
->free
= req
->rq_next
;
1088 xprt
->cong
+= RPC_CWNDSCALE
;
1089 task
->tk_rqstp
= req
;
1090 req
->rq_next
= NULL
;
1091 xprt_request_init(task
, xprt
);
1093 task
->tk_status
= -EAGAIN
;
1096 if (xprt
->free
&& !RPCXPRT_CONGESTED(xprt
))
1097 rpc_wake_up_next(&xprt
->backlog
);
1102 printk("RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1103 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1107 printk("RPC: used rqst slot %p on free list!\n", req
);
1109 task
->tk_status
= -EIO
;
1115 * Initialize RPC request
1118 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1120 struct rpc_rqst
*req
= task
->tk_rqstp
;
1126 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1127 task
->tk_status
= 0;
1129 req
->rq_timeout
= xprt
->timeout
;
1130 req
->rq_task
= task
;
1131 req
->rq_xprt
= xprt
;
1132 req
->rq_xid
= xid
++;
1136 * Release an RPC call slot
1139 xprt_release(struct rpc_task
*task
)
1141 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1142 struct rpc_rqst
*req
;
1144 if (!(req
= task
->tk_rqstp
))
1146 task
->tk_rqstp
= NULL
;
1147 memset(req
, 0, sizeof(*req
)); /* mark unused */
1149 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1151 /* remove slot from queue of pending */
1153 if (task
->tk_rpcwait
) {
1154 printk("RPC: task of released request still queued!\n");
1156 printk("RPC: (task is on %s)\n", rpc_qname(task
->tk_rpcwait
));
1158 rpc_del_timer(task
);
1159 rpc_remove_wait_queue(task
);
1163 /* Decrease congestion value. If congestion threshold is not yet
1164 * reached, pass on the request slot.
1165 * This looks kind of kludgy, but it guarantees backlogged requests
1166 * are served in order.
1168 xprt
->cong
-= RPC_CWNDSCALE
;
1169 if (!RPCXPRT_CONGESTED(xprt
)) {
1170 struct rpc_task
*next
= rpc_wake_up_next(&xprt
->backlog
);
1172 if (next
&& next
->tk_rqstp
== 0) {
1173 xprt
->cong
+= RPC_CWNDSCALE
;
1174 next
->tk_rqstp
= req
;
1175 xprt_request_init(next
, xprt
);
1180 req
->rq_next
= xprt
->free
;
1185 * Set default timeout parameters
1188 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1190 if (proto
== IPPROTO_UDP
)
1191 xprt_set_timeout(to
, 5, 5 * HZ
);
1193 xprt_set_timeout(to
, 5, 15 * HZ
);
1197 * Set constant timeout
1200 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1204 to
->to_increment
= incr
;
1205 to
->to_maxval
= incr
* retr
;
1206 to
->to_resrvval
= incr
* retr
;
1207 to
->to_retries
= retr
;
1208 to
->to_exponential
= 0;
1212 * Initialize an RPC client
1214 static struct rpc_xprt
*
1215 xprt_setup(struct socket
*sock
, int proto
,
1216 struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1218 struct rpc_xprt
*xprt
;
1219 struct rpc_rqst
*req
;
1223 dprintk("RPC: setting up %s transport...\n",
1224 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1226 #if LINUX_VERSION_CODE >= 0x020100
1229 inet
= (struct sock
*) sock
->data
;
1232 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1234 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1241 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1242 xprt
->cwnd
= RPC_INITCWND
;
1243 #ifdef SOCK_HAS_USER_DATA
1244 inet
->user_data
= xprt
;
1246 xprt
->link
= sock_list
;
1249 xprt
->old_data_ready
= inet
->data_ready
;
1250 xprt
->old_state_change
= inet
->state_change
;
1251 xprt
->old_write_space
= inet
->write_space
;
1252 if (proto
== IPPROTO_UDP
) {
1253 inet
->data_ready
= udp_data_ready
;
1255 inet
->data_ready
= tcp_data_ready
;
1256 inet
->state_change
= tcp_state_change
;
1257 inet
->write_space
= tcp_write_space
;
1260 xprt
->connected
= 1;
1262 /* Set timeout parameters */
1264 xprt
->timeout
= *to
;
1265 xprt
->timeout
.to_current
= to
->to_initval
;
1266 xprt
->timeout
.to_resrvval
= to
->to_maxval
<< 1;
1268 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1271 xprt
->pending
= RPC_INIT_WAITQ("xprt_pending");
1272 xprt
->sending
= RPC_INIT_WAITQ("xprt_sending");
1273 xprt
->backlog
= RPC_INIT_WAITQ("xprt_backlog");
1274 xprt
->reconn
= RPC_INIT_WAITQ("xprt_reconn");
1276 /* initialize free list */
1277 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1278 req
->rq_next
= req
+ 1;
1279 req
->rq_next
= NULL
;
1280 xprt
->free
= xprt
->slot
;
1282 dprintk("RPC: created transport %p\n", xprt
);
1285 * TCP requires the rpc I/O daemon is present
1287 if(proto
==IPPROTO_TCP
)
1293 * Create and initialize an RPC client given an open file.
1294 * This is obsolete now.
1298 xprt_create(struct file
*file
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1300 struct rpc_xprt
*xprt
;
1301 struct socket
*sock
;
1305 printk("RPC: file == NULL in xprt_create!\n");
1309 sock
= &file
->f_inode
->u
.socket_i
;
1310 if (sock
->ops
->family
!= PF_INET
) {
1311 printk(KERN_WARNING
"RPC: only INET sockets supported\n");
1315 proto
= (sock
->type
== SOCK_DGRAM
)? IPPROTO_UDP
: IPPROTO_TCP
;
1316 if ((xprt
= xprt_setup(sock
, proto
, ap
, to
)) != NULL
) {
1326 * Bind to a reserved port
1329 xprt_bindresvport(struct socket
*sock
)
1331 struct sockaddr_in myaddr
;
1334 memset(&myaddr
, 0, sizeof(myaddr
));
1335 myaddr
.sin_family
= AF_INET
;
1338 myaddr
.sin_port
= htons(port
);
1339 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1341 } while (err
== -EADDRINUSE
&& --port
> 0);
1344 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1350 * Create a client socket given the protocol and peer address.
1352 static struct socket
*
1353 xprt_create_socket(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1355 struct socket
*sock
;
1358 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1359 sap
? ntohl(sap
->sin_addr
.s_addr
) : 0,
1360 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1362 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1363 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1364 printk("RPC: can't create socket (%d).\n", -err
);
1368 /* If the caller has root privs, bind to a reserved port */
1369 if (!current
->fsuid
&& xprt_bindresvport(sock
) < 0)
1372 if (type
== SOCK_STREAM
&& sap
) {
1373 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) sap
,
1376 printk("RPC: TCP connect failed (%d).\n", -err
);
1389 * Create an RPC client transport given the protocol and peer address.
1392 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1394 struct socket
*sock
;
1395 struct rpc_xprt
*xprt
;
1397 dprintk("RPC: xprt_create_proto called\n");
1399 if (!(sock
= xprt_create_socket(proto
, sap
, to
)))
1402 if (!(xprt
= xprt_setup(sock
, proto
, sap
, to
)))
1409 * Prepare for transport shutdown.
1412 xprt_shutdown(struct rpc_xprt
*xprt
)
1415 rpc_wake_up(&xprt
->sending
);
1416 rpc_wake_up(&xprt
->pending
);
1417 rpc_wake_up(&xprt
->backlog
);
1418 rpc_wake_up(&xprt
->reconn
);
1422 * Destroy an RPC transport, killing off all requests.
1425 xprt_destroy(struct rpc_xprt
*xprt
)
1427 #ifndef SOCK_HAS_USER_DATA
1428 struct rpc_xprt
**q
;
1430 for (q
= &sock_list
; *q
&& *q
!= xprt
; q
= &((*q
)->link
))
1433 printk(KERN_WARNING
"xprt_destroy: unknown socket!\n");
1434 return -EIO
; /* why is there no EBUGGYSOFTWARE */
1439 dprintk("RPC: destroying transport %p\n", xprt
);