2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
40 #define __KERNEL_SYSCALLS__
42 #include <linux/version.h>
43 #include <linux/types.h>
44 #include <linux/malloc.h>
45 #include <linux/sched.h>
46 #include <linux/errno.h>
47 #include <linux/socket.h>
49 #include <linux/net.h>
51 #include <linux/udp.h>
52 #include <linux/unistd.h>
53 #include <linux/sunrpc/clnt.h>
54 #include <linux/file.h>
58 #include <asm/uaccess.h>
60 #define SOCK_HAS_USER_DATA
65 #ifndef SOCK_HAS_USER_DATA
66 static struct rpc_xprt
* sock_list
= NULL
;
70 # undef RPC_DEBUG_DATA
71 # define RPCDBG_FACILITY RPCDBG_XPRT
75 # define MAX(a, b) ((a) > (b)? (a) : (b))
76 # define MIN(a, b) ((a) < (b)? (a) : (b))
82 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
83 static void xprt_transmit_status(struct rpc_task
*task
);
84 static void xprt_receive_status(struct rpc_task
*task
);
85 static void xprt_reserve_status(struct rpc_task
*task
);
86 static void xprt_reconn_timeout(struct rpc_task
*task
);
87 static void xprt_reconn_status(struct rpc_task
*task
);
88 static struct socket
*xprt_create_socket(int, struct sockaddr_in
*,
89 struct rpc_timeout
*);
93 * Print the buffer contents (first 128 bytes only--just enough for
97 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
99 u8
*buf
= (u8
*) packet
;
102 dprintk("RPC: %s\n", msg
);
103 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
107 dprintk("0x%04x ", j
);
109 dprintk("%02x%02x%02x%02x ",
110 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
116 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
123 * Look up RPC transport given an INET socket
125 static inline struct rpc_xprt
*
126 xprt_from_sock(struct sock
*sk
)
128 #ifndef SOCK_HAS_USER_DATA
129 struct rpc_xprt
*xprt
;
131 for (xprt
= sock_list
; xprt
&& sk
!= xprt
->inet
; xprt
= xprt
->link
)
135 return (struct rpc_xprt
*) sk
->user_data
;
140 * Adjust the iovec to move on 'n' bytes
143 extern inline void xprt_move_iov(struct msghdr
*msg
, struct iovec
*niv
, int amount
)
145 struct iovec
*iv
=msg
->msg_iov
;
148 * Eat any sent iovecs
151 while(iv
->iov_len
< amount
)
161 * And chew down the partial one
164 niv
[0].iov_len
= iv
->iov_len
-amount
;
165 niv
[0].iov_base
=((unsigned char *)iv
->iov_base
)+amount
;
169 * And copy any others
172 for(amount
=1;amount
<msg
->msg_iovlen
; amount
++)
179 * Write data to socket.
183 xprt_sendmsg(struct rpc_xprt
*xprt
)
185 struct socket
*sock
= xprt
->sock
;
189 struct iovec niv
[MAX_IOVEC
];
191 xprt_pktdump("packet data:",
192 xprt
->snd_buf
.io_vec
->iov_base
,
193 xprt
->snd_buf
.io_vec
->iov_len
);
195 msg
.msg_flags
= MSG_DONTWAIT
;
196 msg
.msg_iov
= xprt
->snd_buf
.io_vec
;
197 msg
.msg_iovlen
= xprt
->snd_buf
.io_nr
;
198 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
199 msg
.msg_namelen
= sizeof(xprt
->addr
);
200 msg
.msg_control
= NULL
;
202 /* Dont repeat bytes */
205 xprt_move_iov(&msg
, niv
, xprt
->snd_sent
);
207 oldfs
= get_fs(); set_fs(get_ds());
208 result
= sock_sendmsg(sock
, &msg
, xprt
->snd_buf
.io_len
);
211 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
212 xprt
->snd_buf
.io_len
, result
);
215 xprt
->snd_buf
.io_len
-= result
;
216 xprt
->snd_sent
+= result
;
222 /* When the server has died, an ICMP port unreachable message
223 * prompts ECONNREFUSED.
228 case -ENOTCONN
: case -EPIPE
:
229 /* connection broken */
232 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
239 * Read data from socket
242 xprt_recvmsg(struct rpc_xprt
*xprt
, struct iovec
*iov
, int nr
, int len
)
244 struct socket
*sock
= xprt
->sock
;
245 struct sockaddr_in sin
;
250 #if LINUX_VERSION_CODE >= 0x020100
251 msg
.msg_flags
= MSG_DONTWAIT
;
255 msg
.msg_namelen
= sizeof(sin
);
256 msg
.msg_control
= NULL
;
258 oldfs
= get_fs(); set_fs(get_ds());
259 result
= sock_recvmsg(sock
, &msg
, len
, MSG_DONTWAIT
);
262 int alen
= sizeof(sin
);
267 msg
.msg_namelen
= sizeof(sin
);
268 msg
.msg_control
= NULL
;
270 oldfs
= get_fs(); set_fs(get_ds());
271 result
= sock
->ops
->recvmsg(sock
, &msg
, len
, 1, 0, &alen
);
278 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
285 * Adjust RPC congestion window
286 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
289 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
291 unsigned long cwnd
= xprt
->cwnd
;
296 if (xprt
->cong
< cwnd
|| time_before(jiffies
, xprt
->congtime
))
298 /* The (cwnd >> 1) term makes sure
299 * the result gets rounded properly. */
300 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
301 if (cwnd
> RPC_MAXCWND
)
304 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
305 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 2) / RPC_CWNDSCALE
;
306 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
307 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
308 (xprt
->congtime
-jiffies
)*1000/HZ
);
309 } else if (result
== -ETIMEDOUT
) {
310 if ((cwnd
>>= 1) < RPC_CWNDSCALE
)
311 cwnd
= RPC_CWNDSCALE
;
312 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 3) / RPC_CWNDSCALE
;
313 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
314 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
315 (xprt
->congtime
-jiffies
)*1000/HZ
);
316 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
323 * Adjust timeout values etc for next retransmit
326 xprt_adjust_timeout(struct rpc_timeout
*to
)
328 if (to
->to_exponential
)
329 to
->to_current
<<= 1;
331 to
->to_current
+= to
->to_increment
;
332 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
) {
333 to
->to_current
= to
->to_maxval
;
336 if (!to
->to_current
) {
337 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
338 to
->to_current
= 5 * HZ
;
340 pprintk("RPC: %lu %s\n", jiffies
,
341 to
->to_retries
? "retrans" : "timeout");
342 return (to
->to_retries
)--;
346 * Close down a transport socket
349 xprt_close(struct rpc_xprt
*xprt
)
351 struct sock
*sk
= xprt
->inet
;
353 #ifdef SOCK_HAS_USER_DATA
354 sk
->user_data
= NULL
;
356 sk
->data_ready
= xprt
->old_data_ready
;
357 sk
->state_change
= xprt
->old_state_change
;
358 sk
->write_space
= xprt
->old_write_space
;
363 sock_release(xprt
->sock
);
365 * TCP doesnt require the rpciod now - other things may
366 * but rpciod handles that not us.
373 * Mark a transport as disconnected
376 xprt_disconnect(struct rpc_xprt
*xprt
)
378 dprintk("RPC: disconnected transport %p\n", xprt
);
379 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
380 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
385 * Reconnect a broken TCP connection.
388 xprt_reconnect(struct rpc_task
*task
)
390 struct rpc_xprt
*xprt
= task
->tk_xprt
;
395 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
396 task
->tk_pid
, xprt
, xprt
->connected
);
399 if (xprt
->connecting
) {
400 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
401 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
404 xprt
->connecting
= 1;
406 /* Create an unconnected socket */
407 if (!(sock
= xprt_create_socket(xprt
->prot
, NULL
, &xprt
->timeout
)))
410 #if LINUX_VERSION_CODE >= 0x020100
413 inet
= (struct sock
*) sock
->data
;
415 inet
->data_ready
= xprt
->inet
->data_ready
;
416 inet
->state_change
= xprt
->inet
->state_change
;
417 inet
->write_space
= xprt
->inet
->write_space
;
418 #ifdef SOCK_HAS_USER_DATA
419 inet
->user_data
= xprt
;
422 dprintk("RPC: %4d closing old socket\n", task
->tk_pid
);
423 xprt_disconnect(xprt
);
426 /* Reset to new socket and default congestion */
429 xprt
->cwnd
= RPC_INITCWND
;
431 /* Now connect it asynchronously. */
432 dprintk("RPC: %4d connecting new socket\n", task
->tk_pid
);
433 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
434 sizeof(xprt
->addr
), O_NONBLOCK
);
436 if (status
!= -EINPROGRESS
&& status
!= -EALREADY
) {
437 printk("RPC: TCP connect error %d!\n", -status
);
441 dprintk("RPC: %4d connect status %d connected %d\n",
442 task
->tk_pid
, status
, xprt
->connected
);
443 task
->tk_timeout
= 60 * HZ
;
446 if (!xprt
->connected
) {
447 rpc_sleep_on(&xprt
->reconn
, task
,
448 xprt_reconn_status
, xprt_reconn_timeout
);
455 xprt
->connecting
= 0;
456 rpc_wake_up(&xprt
->reconn
);
460 task
->tk_timeout
= 30 * HZ
;
461 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
462 xprt
->connecting
= 0;
469 xprt_reconn_status(struct rpc_task
*task
)
471 struct rpc_xprt
*xprt
= task
->tk_xprt
;
473 dprintk("RPC: %4d xprt_reconn_status %d\n",
474 task
->tk_pid
, task
->tk_status
);
475 if (!xprt
->connected
&& task
->tk_status
!= -ETIMEDOUT
) {
476 task
->tk_timeout
= 30 * HZ
;
477 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, xprt_reconn_timeout
);
482 * Reconnect timeout. We just mark the transport as not being in the
483 * process of reconnecting, and leave the rest to the upper layers.
486 xprt_reconn_timeout(struct rpc_task
*task
)
488 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
489 task
->tk_pid
, task
->tk_status
);
490 task
->tk_status
= -ENOTCONN
;
491 task
->tk_xprt
->connecting
= 0;
492 task
->tk_timeout
= 0;
493 rpc_wake_up_task(task
);
497 * Look up the RPC request corresponding to a reply.
499 static inline struct rpc_rqst
*
500 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
502 struct rpc_task
*head
, *task
;
503 struct rpc_rqst
*req
;
506 if ((head
= xprt
->pending
.task
) != NULL
) {
509 if ((req
= task
->tk_rqstp
) && req
->rq_xid
== xid
)
511 task
= task
->tk_next
;
513 printk("xprt_lookup_rqst: loop in Q!\n");
516 } while (task
!= head
);
518 dprintk("RPC: unknown XID %08x in reply.\n", xid
);
523 * Complete reply received.
524 * The TCP code relies on us to remove the request from xprt->pending.
527 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
529 struct rpc_task
*task
= req
->rq_task
;
531 req
->rq_rlen
= copied
;
534 /* Adjust congestion window */
535 xprt_adjust_cwnd(xprt
, copied
);
538 /* Profile only reads for now */
540 static unsigned long nextstat
= 0;
541 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
544 pkt_len
+= req
->rq_slen
+ copied
;
545 pkt_rtt
+= jiffies
- req
->rq_xtime
;
546 if (time_before(nextstat
, jiffies
)) {
547 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
548 printk("RPC: %ld %ld %ld %ld stat\n",
549 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
550 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
551 nextstat
= jiffies
+ 5 * HZ
;
556 /* ... and wake up the process. */
557 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
558 task
->tk_status
= copied
;
560 rpc_wake_up_task(task
);
565 * Input handler for RPC replies. Called from a bottom half and hence
569 udp_data_ready(struct sock
*sk
, int len
)
571 struct rpc_task
*task
;
572 struct rpc_xprt
*xprt
;
573 struct rpc_rqst
*rovr
;
575 struct iovec iov
[MAX_IOVEC
];
576 int err
, repsize
, copied
;
578 dprintk("RPC: udp_data_ready...\n");
579 if (!(xprt
= xprt_from_sock(sk
)))
581 dprintk("RPC: udp_data_ready client %p\n", xprt
);
583 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
585 repsize
= skb
->len
- 8; /* don't account for UDP header */
588 printk("RPC: impossible RPC reply size %d!\n", repsize
);
592 /* Look up the request corresponding to the given XID */
593 if (!(rovr
= xprt_lookup_rqst(xprt
, *(u32
*) (skb
->h
.raw
+ 8))))
595 task
= rovr
->rq_task
;
597 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
598 xprt_pktdump("packet data:", (u32
*) (skb
->h
.raw
+8), repsize
);
600 if ((copied
= rovr
->rq_rlen
) > repsize
)
603 /* Okay, we have it. Copy datagram... */
604 memcpy(iov
, rovr
->rq_rvec
, rovr
->rq_rnr
* sizeof(iov
[0]));
605 /* This needs to stay tied with the usermode skb_copy_dagram... */
606 memcpy_tokerneliovec(iov
, skb
->data
+8, copied
);
608 xprt_complete_rqst(xprt
, rovr
, copied
);
611 skb_free_datagram(sk
, skb
);
616 * TCP record receive routine
617 * This is not the most efficient code since we call recvfrom twice--
618 * first receiving the record marker and XID, then the data.
620 * The optimal solution would be a RPC support in the TCP layer, which
621 * would gather all data up to the next record marker and then pass us
622 * the list of all TCP segments ready to be copied.
625 tcp_input_record(struct rpc_xprt
*xprt
)
627 struct rpc_rqst
*req
;
631 int result
, maxcpy
, reclen
, avail
, want
;
633 dprintk("RPC: tcp_input_record\n");
634 offset
= xprt
->tcp_offset
;
636 if (offset
< 4 || (!xprt
->tcp_more
&& offset
< 8)) {
637 want
= (xprt
->tcp_more
? 4 : 8) - offset
;
638 dprintk("RPC: reading header (%d bytes)\n", want
);
639 riov
.iov_base
= xprt
->tcp_recm
.data
+ offset
;
641 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
650 /* Get the record length and mask out the more_fragments bit */
651 reclen
= ntohl(xprt
->tcp_reclen
);
652 dprintk("RPC: reclen %08x\n", reclen
);
653 xprt
->tcp_more
= (reclen
& 0x80000000)? 0 : 1;
654 if (!(reclen
&= 0x7fffffff)) {
655 printk(KERN_NOTICE
"RPC: empty TCP record.\n");
656 return -ENOTCONN
; /* break connection */
658 xprt
->tcp_total
+= reclen
;
659 xprt
->tcp_reclen
= reclen
;
661 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
662 xprt
->tcp_xid
, xprt
->tcp_reclen
, xprt
->tcp_more
);
663 if (!xprt
->tcp_copied
664 && (req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
))) {
665 iov
= xprt
->tcp_iovec
;
666 memcpy(iov
, req
->rq_rvec
, req
->rq_rnr
* sizeof(iov
[0]));
668 *(u32
*)iov
->iov_base
= req
->rq_xid
;
672 xprt
->tcp_copied
= 4;
673 xprt
->tcp_rqstp
= req
;
676 reclen
= xprt
->tcp_reclen
;
679 avail
= reclen
- (offset
- 4);
680 if ((req
= xprt
->tcp_rqstp
) && req
->rq_xid
== xprt
->tcp_xid
681 && req
->rq_task
->tk_rpcwait
== &xprt
->pending
) {
682 want
= MIN(req
->rq_rlen
- xprt
->tcp_copied
, avail
);
684 dprintk("RPC: %4d TCP receiving %d bytes\n",
685 req
->rq_task
->tk_pid
, want
);
686 result
= xprt_recvmsg(xprt
, xprt
->tcp_iovec
, req
->rq_rnr
, want
);
689 xprt
->tcp_copied
+= result
;
697 maxcpy
= MIN(req
->rq_rlen
, xprt
->tcp_total
);
698 if (xprt
->tcp_copied
== maxcpy
&& !xprt
->tcp_more
) {
699 dprintk("RPC: %4d received reply complete\n",
700 req
->rq_task
->tk_pid
);
701 xprt_complete_rqst(xprt
, req
, xprt
->tcp_total
);
702 xprt
->tcp_copied
= 0;
703 xprt
->tcp_rqstp
= NULL
;
705 /* Request must be re-encoded before retransmit */
709 /* Skip over any trailing bytes on short reads */
713 want
= MIN(avail
, sizeof(dummy
));
714 riov
.iov_base
= dummy
;
716 dprintk("RPC: TCP skipping %d bytes\n", want
);
717 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
732 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
733 offset
, xprt
->tcp_total
, xprt
->tcp_copied
);
734 xprt
->tcp_offset
= offset
;
739 * TCP task queue stuff
742 static struct rpc_xprt
*rpc_xprt_pending
= NULL
; /* Chain by rx_pending of rpc_xprt's */
745 * This is protected from tcp_data_ready and the stack as its run
746 * inside of the RPC I/O daemon
749 void rpciod_tcp_dispatcher(void)
751 struct rpc_xprt
*xprt
;
754 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
757 * Empty each pending socket
760 while((xprt
=rpc_xprt_pending
)!=NULL
)
764 rpc_xprt_pending
=xprt
->rx_pending
;
765 xprt
->rx_pending_flag
=0;
767 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt
);
771 if (safe_retry
++ > 50)
773 result
= tcp_input_record(xprt
);
782 xprt_disconnect(xprt
);
785 printk(KERN_WARNING
"RPC: unexpected error %d from tcp_input_record\n",
792 extern inline void tcp_rpciod_queue(void)
798 * data_ready callback for TCP. We can't just jump into the
799 * tcp recvmsg functions inside of the network receive bh or
800 * bad things occur. We queue it to pick up after networking
804 static void tcp_data_ready(struct sock
*sk
, int len
)
806 struct rpc_xprt
*xprt
;
808 dprintk("RPC: tcp_data_ready...\n");
809 if (!(xprt
= xprt_from_sock(sk
)))
811 printk("Not a socket with xprt %p\n", sk
);
814 dprintk("RPC: tcp_data_ready client %p\n", xprt
);
815 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
816 sk
->state
, xprt
->connected
,
817 sk
->dead
, sk
->zapped
);
819 * If we are not waiting for the RPC bh run then
822 if (!xprt
->rx_pending_flag
)
826 dprintk("RPC: xprt queue %p\n", rpc_xprt_pending
);
827 if(rpc_xprt_pending
==NULL
)
829 xprt
->rx_pending_flag
=1;
830 xprt
->rx_pending
=rpc_xprt_pending
;
831 rpc_xprt_pending
=xprt
;
839 dprintk("RPC: xprt queued already %p\n", xprt
);
844 tcp_state_change(struct sock
*sk
)
846 struct rpc_xprt
*xprt
;
848 if (!(xprt
= xprt_from_sock(sk
)))
850 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
851 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
852 sk
->state
, xprt
->connected
,
853 sk
->dead
, sk
->zapped
);
855 if (sk
->state
== TCP_ESTABLISHED
&& !xprt
->connected
) {
857 xprt
->connecting
= 0;
858 rpc_wake_up(&xprt
->reconn
);
859 } else if (sk
->zapped
) {
860 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
861 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
862 rpc_wake_up_status(&xprt
->reconn
, -ENOTCONN
);
867 tcp_write_space(struct sock
*sk
)
869 struct rpc_xprt
*xprt
;
871 if (!(xprt
= xprt_from_sock(sk
)))
873 if(xprt
->snd_sent
&& xprt
->snd_task
)
874 printk("write space\n");
875 if(xprt
->write_space
== 0)
877 xprt
->write_space
= 1;
878 if (xprt
->snd_task
&& !RPC_IS_RUNNING(xprt
->snd_task
))
881 printk("Write wakeup snd_sent =%d\n",
883 rpc_wake_up_task(xprt
->snd_task
);
889 * RPC receive timeout handler.
892 xprt_timer(struct rpc_task
*task
)
894 struct rpc_rqst
*req
= task
->tk_rqstp
;
897 xprt_adjust_cwnd(task
->tk_xprt
, -ETIMEDOUT
);
900 dprintk("RPC: %4d xprt_timer (%s request)\n",
901 task
->tk_pid
, req
? "pending" : "backlogged");
903 task
->tk_status
= -ETIMEDOUT
;
904 task
->tk_timeout
= 0;
905 rpc_wake_up_task(task
);
909 * (Partly) transmit the RPC packet
910 * Note that task->tk_status is either 0 or negative on return.
911 * Only when the reply is received will the status be set to a
915 xprt_transmit_some(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
917 struct rpc_rqst
*req
= task
->tk_rqstp
;
921 if ((result
= xprt_sendmsg(xprt
)) >= 0) {
922 if (!xprt
->snd_buf
.io_len
|| !xprt
->stream
) {
923 rpc_wake_up_next(&xprt
->sending
);
927 } else if (xprt
->stream
) {
928 if (result
== -ENOTCONN
|| result
== -EPIPE
) {
929 xprt_disconnect(xprt
);
933 return task
->tk_status
= result
;
937 * Place the actual RPC call.
938 * We have to copy the iovec because sendmsg fiddles with its contents.
941 xprt_transmit(struct rpc_task
*task
)
943 struct rpc_timeout
*timeo
;
944 struct rpc_rqst
*req
= task
->tk_rqstp
;
945 struct rpc_xprt
*xprt
= req
->rq_xprt
;
948 /*DEBUG*/int ac_debug
=xprt
->snd_sent
;
950 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
951 *(u32
*)(req
->rq_svec
[0].iov_base
));
953 if (xprt
->shutdown
) {
954 task
->tk_status
= -EIO
;
958 /* If we're not already in the process of transmitting our call,
959 * set up everything as needed. */
960 if (xprt
->snd_task
!= task
) {
961 /* Write the record marker */
965 if (!xprt
->connected
) {
966 task
->tk_status
= -ENOTCONN
;
969 marker
= htonl(0x80000000|(req
->rq_slen
-4));
970 *((u32
*) req
->rq_svec
[0].iov_base
) = marker
;
973 /* Reset timeout parameters */
974 timeo
= &req
->rq_timeout
;
975 if (timeo
->to_retries
< 0) {
976 dprintk("RPC: %4d xprt_transmit reset timeo\n",
978 timeo
->to_retries
= xprt
->timeout
.to_retries
;
979 timeo
->to_current
= timeo
->to_initval
;
983 req
->rq_xtime
= jiffies
;
987 if (xprt
->snd_task
) {
988 dprintk("RPC: %4d TCP write queue full (task %d)\n",
989 task
->tk_pid
, xprt
->snd_task
->tk_pid
);
990 rpc_sleep_on(&xprt
->sending
, task
,
991 xprt_transmit_status
, NULL
);
994 xprt
->snd_buf
= req
->rq_snd_buf
;
995 xprt
->snd_task
= task
;
997 /*DEBUG*/ac_debug
= 0;
1000 /* For fast networks/servers we have to put the request on
1001 * the pending list now:
1004 status
= rpc_add_wait_queue(&xprt
->pending
, task
);
1006 task
->tk_callback
= NULL
;
1011 printk(KERN_WARNING
"RPC: failed to add task to queue: error: %d!\n", status
);
1012 task
->tk_status
= status
;
1016 /* Continue transmitting the packet/record. We must be careful
1017 * to cope with writespace callbacks arriving _after_ we have
1018 * called xprt_sendmsg().
1021 xprt
->write_space
= 0;
1022 if (xprt_transmit_some(xprt
, task
) != -EAGAIN
) {
1023 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1024 xprt
->snd_task
= NULL
;
1026 printk("Partial xmit finished\n");
1030 /*d*/printk("RPC: %4d xmit incomplete (%d left of %d)\n",
1031 task
->tk_pid
, xprt
->snd_buf
.io_len
,
1033 task
->tk_status
= 0;
1035 if (!xprt
->write_space
) {
1036 /* Remove from pending */
1037 rpc_remove_wait_queue(task
);
1038 rpc_sleep_on(&xprt
->sending
, task
,
1039 xprt_transmit_status
, NULL
);
1048 * This callback is invoked when the sending task is forced to sleep
1049 * because the TCP write buffers are full
1052 xprt_transmit_status(struct rpc_task
*task
)
1054 struct rpc_xprt
*xprt
= task
->tk_client
->cl_xprt
;
1056 dprintk("RPC: %4d transmit_status %d\n", task
->tk_pid
, task
->tk_status
);
1057 if (xprt
->snd_task
== task
)
1059 if (task
->tk_status
< 0)
1061 xprt
->snd_task
= NULL
;
1062 xprt_disconnect(xprt
);
1065 xprt_transmit(task
);
1070 * Wait for the reply to our call.
1071 * When the callback is invoked, the congestion window should have
1072 * been updated already.
1075 xprt_receive(struct rpc_task
*task
)
1077 struct rpc_rqst
*req
= task
->tk_rqstp
;
1078 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1080 dprintk("RPC: %4d xprt_receive\n", task
->tk_pid
);
1081 if (xprt
->connected
== 0) {
1082 task
->tk_status
= -ENOTCONN
;
1087 * Wait until rq_gotit goes non-null, or timeout elapsed.
1089 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1092 if (!req
->rq_gotit
) {
1093 rpc_sleep_on(&xprt
->pending
, task
,
1094 xprt_receive_status
, xprt_timer
);
1098 dprintk("RPC: %4d xprt_receive returns %d\n",
1099 task
->tk_pid
, task
->tk_status
);
1103 xprt_receive_status(struct rpc_task
*task
)
1105 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1107 if (xprt
->stream
&& xprt
->tcp_rqstp
== task
->tk_rqstp
)
1108 xprt
->tcp_rqstp
= NULL
;
1112 * Reserve an RPC call slot.
1115 xprt_reserve(struct rpc_task
*task
)
1117 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1119 /* We already have an initialized request. */
1123 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1124 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1125 if ((!RPCXPRT_CONGESTED(xprt
) && xprt
->free
)) {
1126 xprt_reserve_status(task
);
1127 task
->tk_timeout
= 0;
1128 } else if (!task
->tk_timeout
) {
1129 task
->tk_status
= -ENOBUFS
;
1131 dprintk("RPC: xprt_reserve waiting on backlog\n");
1132 rpc_sleep_on(&xprt
->backlog
, task
, xprt_reserve_status
, NULL
);
1134 dprintk("RPC: %4d xprt_reserve returns %d\n",
1135 task
->tk_pid
, task
->tk_status
);
1136 return task
->tk_status
;
1140 * Reservation callback
1143 xprt_reserve_status(struct rpc_task
*task
)
1145 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1146 struct rpc_rqst
*req
;
1148 if (xprt
->shutdown
) {
1149 task
->tk_status
= -EIO
;
1150 } else if (task
->tk_status
< 0) {
1152 } else if (task
->tk_rqstp
) {
1153 /* We've already been given a request slot: NOP */
1154 } else if (!RPCXPRT_CONGESTED(xprt
)) {
1155 /* OK: There's room for us. Grab a free slot and bump
1156 * congestion value */
1162 xprt
->free
= req
->rq_next
;
1163 xprt
->cong
+= RPC_CWNDSCALE
;
1164 task
->tk_rqstp
= req
;
1165 req
->rq_next
= NULL
;
1166 xprt_request_init(task
, xprt
);
1168 task
->tk_status
= -EAGAIN
;
1171 if (xprt
->free
&& !RPCXPRT_CONGESTED(xprt
))
1172 rpc_wake_up_next(&xprt
->backlog
);
1178 "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1179 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1183 printk(KERN_ERR
"RPC: used rqst slot %p on free list!\n", req
);
1185 task
->tk_status
= -EIO
;
1191 * Initialize RPC request
1194 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1196 struct rpc_rqst
*req
= task
->tk_rqstp
;
1202 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1203 task
->tk_status
= 0;
1205 req
->rq_timeout
= xprt
->timeout
;
1206 req
->rq_task
= task
;
1207 req
->rq_xprt
= xprt
;
1208 req
->rq_xid
= xid
++;
1212 * Release an RPC call slot
1215 xprt_release(struct rpc_task
*task
)
1217 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1218 struct rpc_rqst
*req
;
1220 if (!(req
= task
->tk_rqstp
))
1222 task
->tk_rqstp
= NULL
;
1223 memset(req
, 0, sizeof(*req
)); /* mark unused */
1225 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1227 /* remove slot from queue of pending */
1229 if (task
->tk_rpcwait
) {
1230 printk("RPC: task of released request still queued!\n");
1232 printk("RPC: (task is on %s)\n", rpc_qname(task
->tk_rpcwait
));
1234 rpc_del_timer(task
);
1235 rpc_remove_wait_queue(task
);
1239 /* Decrease congestion value. */
1240 xprt
->cong
-= RPC_CWNDSCALE
;
1243 /* If congestion threshold is not yet reached, pass on the request slot.
1244 * This looks kind of kludgy, but it guarantees backlogged requests
1245 * are served in order.
1246 * N.B. This doesn't look completely safe, as the task is still
1247 * on the backlog list after wake-up.
1249 if (!RPCXPRT_CONGESTED(xprt
)) {
1250 struct rpc_task
*next
= rpc_wake_up_next(&xprt
->backlog
);
1252 if (next
&& next
->tk_rqstp
== 0) {
1253 xprt
->cong
+= RPC_CWNDSCALE
;
1254 next
->tk_rqstp
= req
;
1255 xprt_request_init(next
, xprt
);
1261 req
->rq_next
= xprt
->free
;
1264 /* If not congested, wake up the next backlogged process */
1265 if (!RPCXPRT_CONGESTED(xprt
))
1266 rpc_wake_up_next(&xprt
->backlog
);
1270 * Set default timeout parameters
1273 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1275 if (proto
== IPPROTO_UDP
)
1276 xprt_set_timeout(to
, 5, 5 * HZ
);
1278 xprt_set_timeout(to
, 5, 15 * HZ
);
1282 * Set constant timeout
1285 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1289 to
->to_increment
= incr
;
1290 to
->to_maxval
= incr
* retr
;
1291 to
->to_resrvval
= incr
* retr
;
1292 to
->to_retries
= retr
;
1293 to
->to_exponential
= 0;
1297 * Initialize an RPC client
1299 static struct rpc_xprt
*
1300 xprt_setup(struct socket
*sock
, int proto
,
1301 struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1303 struct rpc_xprt
*xprt
;
1304 struct rpc_rqst
*req
;
1308 dprintk("RPC: setting up %s transport...\n",
1309 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1311 #if LINUX_VERSION_CODE >= 0x020100
1314 inet
= (struct sock
*) sock
->data
;
1317 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1319 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1326 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1327 xprt
->cwnd
= RPC_INITCWND
;
1328 #ifdef SOCK_HAS_USER_DATA
1329 inet
->user_data
= xprt
;
1331 xprt
->link
= sock_list
;
1334 xprt
->old_data_ready
= inet
->data_ready
;
1335 xprt
->old_state_change
= inet
->state_change
;
1336 xprt
->old_write_space
= inet
->write_space
;
1337 if (proto
== IPPROTO_UDP
) {
1338 inet
->data_ready
= udp_data_ready
;
1340 inet
->data_ready
= tcp_data_ready
;
1341 inet
->state_change
= tcp_state_change
;
1342 inet
->write_space
= tcp_write_space
;
1345 xprt
->connected
= 1;
1347 /* Set timeout parameters */
1349 xprt
->timeout
= *to
;
1350 xprt
->timeout
.to_current
= to
->to_initval
;
1351 xprt
->timeout
.to_resrvval
= to
->to_maxval
<< 1;
1353 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1356 xprt
->pending
= RPC_INIT_WAITQ("xprt_pending");
1357 xprt
->sending
= RPC_INIT_WAITQ("xprt_sending");
1358 xprt
->backlog
= RPC_INIT_WAITQ("xprt_backlog");
1359 xprt
->reconn
= RPC_INIT_WAITQ("xprt_reconn");
1361 /* initialize free list */
1362 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1363 req
->rq_next
= req
+ 1;
1364 req
->rq_next
= NULL
;
1365 xprt
->free
= xprt
->slot
;
1367 dprintk("RPC: created transport %p\n", xprt
);
1370 * TCP requires the rpc I/O daemon is present
1372 if(proto
==IPPROTO_TCP
)
1378 * Create and initialize an RPC client given an open file.
1379 * This is obsolete now.
1383 xprt_create(struct file
*file
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1385 struct rpc_xprt
*xprt
;
1386 struct socket
*sock
;
1390 printk("RPC: file == NULL in xprt_create!\n");
1394 sock
= &file
->f_inode
->u
.socket_i
;
1395 if (sock
->ops
->family
!= PF_INET
) {
1396 printk(KERN_WARNING
"RPC: only INET sockets supported\n");
1400 proto
= (sock
->type
== SOCK_DGRAM
)? IPPROTO_UDP
: IPPROTO_TCP
;
1401 if ((xprt
= xprt_setup(sock
, proto
, ap
, to
)) != NULL
) {
1411 * Bind to a reserved port
1414 xprt_bindresvport(struct socket
*sock
)
1416 struct sockaddr_in myaddr
;
1419 memset(&myaddr
, 0, sizeof(myaddr
));
1420 myaddr
.sin_family
= AF_INET
;
1423 myaddr
.sin_port
= htons(port
);
1424 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1426 } while (err
== -EADDRINUSE
&& --port
> 0);
1429 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1435 * Create a client socket given the protocol and peer address.
1437 static struct socket
*
1438 xprt_create_socket(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1440 struct socket
*sock
;
1443 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1444 sap
? ntohl(sap
->sin_addr
.s_addr
) : 0,
1445 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1447 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1448 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1449 printk("RPC: can't create socket (%d).\n", -err
);
1453 /* If the caller has root privs, bind to a reserved port */
1454 if (!current
->fsuid
&& xprt_bindresvport(sock
) < 0)
1457 if (type
== SOCK_STREAM
&& sap
) {
1458 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) sap
,
1461 printk("RPC: TCP connect failed (%d).\n", -err
);
1474 * Create an RPC client transport given the protocol and peer address.
1477 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1479 struct socket
*sock
;
1480 struct rpc_xprt
*xprt
;
1482 dprintk("RPC: xprt_create_proto called\n");
1484 if (!(sock
= xprt_create_socket(proto
, sap
, to
)))
1487 if (!(xprt
= xprt_setup(sock
, proto
, sap
, to
)))
1494 * Prepare for transport shutdown.
1497 xprt_shutdown(struct rpc_xprt
*xprt
)
1500 rpc_wake_up(&xprt
->sending
);
1501 rpc_wake_up(&xprt
->pending
);
1502 rpc_wake_up(&xprt
->backlog
);
1503 rpc_wake_up(&xprt
->reconn
);
1507 * Destroy an RPC transport, killing off all requests.
1510 xprt_destroy(struct rpc_xprt
*xprt
)
1512 #ifndef SOCK_HAS_USER_DATA
1513 struct rpc_xprt
**q
;
1515 for (q
= &sock_list
; *q
&& *q
!= xprt
; q
= &((*q
)->link
))
1518 printk(KERN_WARNING
"xprt_destroy: unknown socket!\n");
1519 return -EIO
; /* why is there no EBUGGYSOFTWARE */
1524 dprintk("RPC: destroying transport %p\n", xprt
);