2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
40 #define __KERNEL_SYSCALLS__
42 #include <linux/version.h>
43 #include <linux/types.h>
44 #include <linux/malloc.h>
45 #include <linux/sched.h>
46 #include <linux/errno.h>
47 #include <linux/socket.h>
49 #include <linux/net.h>
51 #include <linux/udp.h>
52 #include <linux/unistd.h>
53 #include <linux/sunrpc/clnt.h>
54 #include <linux/file.h>
58 #include <asm/uaccess.h>
60 #define SOCK_HAS_USER_DATA
65 #ifndef SOCK_HAS_USER_DATA
66 static struct rpc_xprt
* sock_list
= NULL
;
70 # undef RPC_DEBUG_DATA
71 # define RPCDBG_FACILITY RPCDBG_XPRT
75 # define MAX(a, b) ((a) > (b)? (a) : (b))
76 # define MIN(a, b) ((a) < (b)? (a) : (b))
82 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
83 static void xprt_transmit_status(struct rpc_task
*task
);
84 static void xprt_receive_status(struct rpc_task
*task
);
85 static void xprt_reserve_status(struct rpc_task
*task
);
86 static void xprt_reconn_timeout(struct rpc_task
*task
);
87 static void xprt_reconn_status(struct rpc_task
*task
);
88 static struct socket
*xprt_create_socket(int, struct sockaddr_in
*,
89 struct rpc_timeout
*);
93 * Print the buffer contents (first 128 bytes only--just enough for
97 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
99 u8
*buf
= (u8
*) packet
;
102 dprintk("RPC: %s\n", msg
);
103 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
107 dprintk("0x%04x ", j
);
109 dprintk("%02x%02x%02x%02x ",
110 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
116 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
123 * Look up RPC transport given an INET socket
125 static inline struct rpc_xprt
*
126 xprt_from_sock(struct sock
*sk
)
128 #ifndef SOCK_HAS_USER_DATA
129 struct rpc_xprt
*xprt
;
131 for (xprt
= sock_list
; xprt
&& sk
!= xprt
->inet
; xprt
= xprt
->link
)
135 return (struct rpc_xprt
*) sk
->user_data
;
140 * Adjust the iovec to move on 'n' bytes
143 extern inline void xprt_move_iov(struct msghdr
*msg
, struct iovec
*niv
, int amount
)
145 struct iovec
*iv
=msg
->msg_iov
;
148 * Eat any sent iovecs
151 while(iv
->iov_len
< amount
)
161 * And chew down the partial one
164 niv
[0].iov_len
= iv
->iov_len
-amount
;
165 niv
[0].iov_base
=((unsigned char *)iv
->iov_base
)+amount
;
169 * And copy any others
172 for(amount
=1;amount
<msg
->msg_iovlen
; amount
++)
179 * Write data to socket.
183 xprt_sendmsg(struct rpc_xprt
*xprt
)
185 struct socket
*sock
= xprt
->sock
;
189 struct iovec niv
[MAX_IOVEC
];
191 xprt_pktdump("packet data:",
192 xprt
->snd_buf
.io_vec
->iov_base
,
193 xprt
->snd_buf
.io_vec
->iov_len
);
195 msg
.msg_flags
= MSG_DONTWAIT
;
196 msg
.msg_iov
= xprt
->snd_buf
.io_vec
;
197 msg
.msg_iovlen
= xprt
->snd_buf
.io_nr
;
198 msg
.msg_name
= (struct sockaddr
*) &xprt
->addr
;
199 msg
.msg_namelen
= sizeof(xprt
->addr
);
200 msg
.msg_control
= NULL
;
202 /* Dont repeat bytes */
205 xprt_move_iov(&msg
, niv
, xprt
->snd_sent
);
207 oldfs
= get_fs(); set_fs(get_ds());
208 result
= sock_sendmsg(sock
, &msg
, xprt
->snd_buf
.io_len
);
211 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
212 xprt
->snd_buf
.io_len
, result
);
215 xprt
->snd_buf
.io_len
-= result
;
216 xprt
->snd_sent
+= result
;
222 /* When the server has died, an ICMP port unreachable message
223 * prompts ECONNREFUSED.
228 case -ENOTCONN
: case -EPIPE
:
229 /* connection broken */
232 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
239 * Read data from socket
242 xprt_recvmsg(struct rpc_xprt
*xprt
, struct iovec
*iov
, int nr
, int len
)
244 struct socket
*sock
= xprt
->sock
;
245 struct sockaddr_in sin
;
250 #if LINUX_VERSION_CODE >= 0x020100
251 msg
.msg_flags
= MSG_DONTWAIT
;
255 msg
.msg_namelen
= sizeof(sin
);
256 msg
.msg_control
= NULL
;
258 oldfs
= get_fs(); set_fs(get_ds());
259 result
= sock_recvmsg(sock
, &msg
, len
, MSG_DONTWAIT
);
262 int alen
= sizeof(sin
);
267 msg
.msg_namelen
= sizeof(sin
);
268 msg
.msg_control
= NULL
;
270 oldfs
= get_fs(); set_fs(get_ds());
271 result
= sock
->ops
->recvmsg(sock
, &msg
, len
, 1, 0, &alen
);
278 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
285 * Adjust RPC congestion window
286 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
289 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
291 unsigned long cwnd
= xprt
->cwnd
;
296 if (xprt
->cong
< cwnd
|| jiffies
< xprt
->congtime
)
298 /* The (cwnd >> 1) term makes sure
299 * the result gets rounded properly. */
300 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
301 if (cwnd
> RPC_MAXCWND
)
304 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
305 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 2) / RPC_CWNDSCALE
;
306 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
307 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
308 (xprt
->congtime
-jiffies
)*1000/HZ
);
309 } else if (result
== -ETIMEDOUT
) {
310 if ((cwnd
>>= 1) < RPC_CWNDSCALE
)
311 cwnd
= RPC_CWNDSCALE
;
312 xprt
->congtime
= jiffies
+ ((cwnd
* HZ
) << 3) / RPC_CWNDSCALE
;
313 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
314 "time %ld ms\n", xprt
->cong
, xprt
->cwnd
, cwnd
,
315 (xprt
->congtime
-jiffies
)*1000/HZ
);
316 pprintk("RPC: %lu %ld cwnd\n", jiffies
, cwnd
);
323 * Adjust timeout values etc for next retransmit
326 xprt_adjust_timeout(struct rpc_timeout
*to
)
328 if (to
->to_exponential
)
329 to
->to_current
<<= 1;
331 to
->to_current
+= to
->to_increment
;
332 if (to
->to_maxval
&& to
->to_current
>= to
->to_maxval
) {
333 to
->to_current
= to
->to_maxval
;
336 if (!to
->to_current
) {
337 printk(KERN_WARNING
"xprt_adjust_timeout: to_current = 0!\n");
338 to
->to_current
= 5 * HZ
;
340 pprintk("RPC: %lu %s\n", jiffies
,
341 to
->to_retries
? "retrans" : "timeout");
342 return (to
->to_retries
)--;
346 * Close down a transport socket
349 xprt_close(struct rpc_xprt
*xprt
)
351 struct sock
*sk
= xprt
->inet
;
353 #ifdef SOCK_HAS_USER_DATA
354 sk
->user_data
= NULL
;
356 sk
->data_ready
= xprt
->old_data_ready
;
357 sk
->state_change
= xprt
->old_state_change
;
358 sk
->write_space
= xprt
->old_write_space
;
363 sock_release(xprt
->sock
);
365 * TCP doesnt require the rpciod now - other things may
366 * but rpciod handles that not us.
373 * Mark a transport as disconnected
376 xprt_disconnect(struct rpc_xprt
*xprt
)
378 dprintk("RPC: disconnected transport %p\n", xprt
);
379 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
380 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
385 * Reconnect a broken TCP connection.
388 xprt_reconnect(struct rpc_task
*task
)
390 struct rpc_xprt
*xprt
= task
->tk_xprt
;
395 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
396 task
->tk_pid
, xprt
, xprt
->connected
);
399 if (xprt
->connecting
) {
400 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
401 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
404 xprt
->connecting
= 1;
406 /* Create an unconnected socket */
407 if (!(sock
= xprt_create_socket(xprt
->prot
, NULL
, &xprt
->timeout
)))
410 #if LINUX_VERSION_CODE >= 0x020100
413 inet
= (struct sock
*) sock
->data
;
415 inet
->data_ready
= xprt
->inet
->data_ready
;
416 inet
->state_change
= xprt
->inet
->state_change
;
417 inet
->write_space
= xprt
->inet
->write_space
;
418 #ifdef SOCK_HAS_USER_DATA
419 inet
->user_data
= xprt
;
422 dprintk("RPC: %4d closing old socket\n", task
->tk_pid
);
423 xprt_disconnect(xprt
);
426 /* Reset to new socket and default congestion */
429 xprt
->cwnd
= RPC_INITCWND
;
431 /* Now connect it asynchronously. */
432 dprintk("RPC: %4d connecting new socket\n", task
->tk_pid
);
433 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
434 sizeof(xprt
->addr
), O_NONBLOCK
);
436 if (status
!= -EINPROGRESS
&& status
!= -EALREADY
) {
437 printk("RPC: TCP connect error %d!\n", -status
);
441 dprintk("RPC: %4d connect status %d connected %d\n",
442 task
->tk_pid
, status
, xprt
->connected
);
443 task
->tk_timeout
= 60 * HZ
;
446 if (!xprt
->connected
) {
447 rpc_sleep_on(&xprt
->reconn
, task
,
448 xprt_reconn_status
, xprt_reconn_timeout
);
455 xprt
->connecting
= 0;
456 rpc_wake_up(&xprt
->reconn
);
460 task
->tk_timeout
= 30 * HZ
;
461 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, NULL
);
462 xprt
->connecting
= 0;
469 xprt_reconn_status(struct rpc_task
*task
)
471 struct rpc_xprt
*xprt
= task
->tk_xprt
;
473 dprintk("RPC: %4d xprt_reconn_status %d\n",
474 task
->tk_pid
, task
->tk_status
);
475 if (!xprt
->connected
&& task
->tk_status
!= -ETIMEDOUT
) {
476 task
->tk_timeout
= 30 * HZ
;
477 rpc_sleep_on(&xprt
->reconn
, task
, NULL
, xprt_reconn_timeout
);
482 * Reconnect timeout. We just mark the transport as not being in the
483 * process of reconnecting, and leave the rest to the upper layers.
486 xprt_reconn_timeout(struct rpc_task
*task
)
488 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
489 task
->tk_pid
, task
->tk_status
);
490 task
->tk_status
= -ENOTCONN
;
491 task
->tk_xprt
->connecting
= 0;
492 task
->tk_timeout
= 0;
493 rpc_wake_up_task(task
);
497 * Look up the RPC request corresponding to a reply.
499 static inline struct rpc_rqst
*
500 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
502 struct rpc_task
*head
, *task
;
503 struct rpc_rqst
*req
;
506 if ((head
= xprt
->pending
.task
) != NULL
) {
509 if ((req
= task
->tk_rqstp
) && req
->rq_xid
== xid
)
511 task
= task
->tk_next
;
513 printk("xprt_lookup_rqst: loop in Q!\n");
516 } while (task
!= head
);
518 dprintk("RPC: unknown XID %08x in reply.\n", xid
);
523 * Complete reply received.
524 * The TCP code relies on us to remove the request from xprt->pending.
527 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
529 struct rpc_task
*task
= req
->rq_task
;
531 req
->rq_rlen
= copied
;
534 /* Adjust congestion window */
535 xprt_adjust_cwnd(xprt
, copied
);
538 /* Profile only reads for now */
540 static unsigned long nextstat
= 0;
541 static unsigned long pkt_rtt
= 0, pkt_len
= 0, pkt_cnt
= 0;
544 pkt_len
+= req
->rq_slen
+ copied
;
545 pkt_rtt
+= jiffies
- req
->rq_xtime
;
546 if (nextstat
< jiffies
) {
547 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
548 printk("RPC: %ld %ld %ld %ld stat\n",
549 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
550 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
551 nextstat
= jiffies
+ 5 * HZ
;
556 /* ... and wake up the process. */
557 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
558 task
->tk_status
= copied
;
560 rpc_wake_up_task(task
);
565 * Input handler for RPC replies. Called from a bottom half and hence
569 udp_data_ready(struct sock
*sk
, int len
)
571 struct rpc_task
*task
;
572 struct rpc_xprt
*xprt
;
573 struct rpc_rqst
*rovr
;
575 struct iovec iov
[MAX_IOVEC
];
577 int err
, repsize
, copied
;
579 dprintk("RPC: udp_data_ready...\n");
580 if (!(xprt
= xprt_from_sock(sk
)))
582 dprintk("RPC: udp_data_ready client %p\n", xprt
);
584 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
586 repsize
= skb
->len
- 8; /* don't account for UDP header */
589 printk("RPC: impossible RPC reply size %d!\n", repsize
);
593 /* Look up the request corresponding to the given XID */
594 if (!(rovr
= xprt_lookup_rqst(xprt
, *(u32
*) (skb
->h
.raw
+ 8))))
596 task
= rovr
->rq_task
;
598 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
599 xprt_pktdump("packet data:", (u32
*) (skb
->h
.raw
+8), repsize
);
601 if ((copied
= rovr
->rq_rlen
) > repsize
)
604 /* Okay, we have it. Copy datagram... */
605 memcpy(iov
, rovr
->rq_rvec
, rovr
->rq_rnr
* sizeof(iov
[0]));
606 oldfs
= get_fs(); set_fs(get_ds());
607 skb_copy_datagram_iovec(skb
, 8, iov
, copied
);
610 xprt_complete_rqst(xprt
, rovr
, copied
);
613 skb_free_datagram(sk
, skb
);
618 * TCP record receive routine
619 * This is not the most efficient code since we call recvfrom twice--
620 * first receiving the record marker and XID, then the data.
622 * The optimal solution would be a RPC support in the TCP layer, which
623 * would gather all data up to the next record marker and then pass us
624 * the list of all TCP segments ready to be copied.
627 tcp_input_record(struct rpc_xprt
*xprt
)
629 struct rpc_rqst
*req
;
633 int result
, maxcpy
, reclen
, avail
, want
;
635 dprintk("RPC: tcp_input_record\n");
636 offset
= xprt
->tcp_offset
;
638 if (offset
< 4 || (!xprt
->tcp_more
&& offset
< 8)) {
639 want
= (xprt
->tcp_more
? 4 : 8) - offset
;
640 dprintk("RPC: reading header (%d bytes)\n", want
);
641 riov
.iov_base
= xprt
->tcp_recm
.data
+ offset
;
643 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
652 /* Get the record length and mask out the more_fragments bit */
653 reclen
= ntohl(xprt
->tcp_reclen
);
654 dprintk("RPC: reclen %08x\n", reclen
);
655 xprt
->tcp_more
= (reclen
& 0x80000000)? 0 : 1;
656 if (!(reclen
&= 0x7fffffff)) {
657 printk(KERN_NOTICE
"RPC: empty TCP record.\n");
658 return -ENOTCONN
; /* break connection */
660 xprt
->tcp_total
+= reclen
;
661 xprt
->tcp_reclen
= reclen
;
663 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
664 xprt
->tcp_xid
, xprt
->tcp_reclen
, xprt
->tcp_more
);
665 if (!xprt
->tcp_copied
666 && (req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
))) {
667 iov
= xprt
->tcp_iovec
;
668 memcpy(iov
, req
->rq_rvec
, req
->rq_rnr
* sizeof(iov
[0]));
670 *(u32
*)iov
->iov_base
= req
->rq_xid
;
674 xprt
->tcp_copied
= 4;
675 xprt
->tcp_rqstp
= req
;
678 reclen
= xprt
->tcp_reclen
;
681 avail
= reclen
- (offset
- 4);
682 if ((req
= xprt
->tcp_rqstp
) && req
->rq_xid
== xprt
->tcp_xid
683 && req
->rq_task
->tk_rpcwait
== &xprt
->pending
) {
684 want
= MIN(req
->rq_rlen
- xprt
->tcp_copied
, avail
);
686 dprintk("RPC: %4d TCP receiving %d bytes\n",
687 req
->rq_task
->tk_pid
, want
);
688 result
= xprt_recvmsg(xprt
, xprt
->tcp_iovec
, req
->rq_rnr
, want
);
691 xprt
->tcp_copied
+= result
;
699 maxcpy
= MIN(req
->rq_rlen
, xprt
->tcp_total
);
700 if (xprt
->tcp_copied
== maxcpy
&& !xprt
->tcp_more
) {
701 dprintk("RPC: %4d received reply complete\n",
702 req
->rq_task
->tk_pid
);
703 xprt_complete_rqst(xprt
, req
, xprt
->tcp_total
);
704 xprt
->tcp_copied
= 0;
705 xprt
->tcp_rqstp
= NULL
;
707 /* Request must be re-encoded before retransmit */
711 /* Skip over any trailing bytes on short reads */
715 want
= MIN(avail
, sizeof(dummy
));
716 riov
.iov_base
= dummy
;
718 dprintk("RPC: TCP skipping %d bytes\n", want
);
719 result
= xprt_recvmsg(xprt
, &riov
, 1, want
);
734 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
735 offset
, xprt
->tcp_total
, xprt
->tcp_copied
);
736 xprt
->tcp_offset
= offset
;
741 * TCP task queue stuff
744 static struct rpc_xprt
*rpc_xprt_pending
= NULL
; /* Chain by rx_pending of rpc_xprt's */
747 * This is protected from tcp_data_ready and the stack as its run
748 * inside of the RPC I/O daemon
751 void rpciod_tcp_dispatcher(void)
753 struct rpc_xprt
*xprt
;
756 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
759 * Empty each pending socket
762 while((xprt
=rpc_xprt_pending
)!=NULL
)
766 rpc_xprt_pending
=xprt
->rx_pending
;
767 xprt
->rx_pending_flag
=0;
769 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt
);
773 if (safe_retry
++ > 50)
775 result
= tcp_input_record(xprt
);
784 xprt_disconnect(xprt
);
787 printk(KERN_WARNING
"RPC: unexpected error %d from tcp_input_record\n",
794 extern inline void tcp_rpciod_queue(void)
800 * data_ready callback for TCP. We can't just jump into the
801 * tcp recvmsg functions inside of the network receive bh or
802 * bad things occur. We queue it to pick up after networking
806 static void tcp_data_ready(struct sock
*sk
, int len
)
808 struct rpc_xprt
*xprt
;
810 dprintk("RPC: tcp_data_ready...\n");
811 if (!(xprt
= xprt_from_sock(sk
)))
813 printk("Not a socket with xprt %p\n", sk
);
816 dprintk("RPC: tcp_data_ready client %p\n", xprt
);
817 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
818 sk
->state
, xprt
->connected
,
819 sk
->dead
, sk
->zapped
);
821 * If we are not waiting for the RPC bh run then
824 if (!xprt
->rx_pending_flag
)
826 dprintk("RPC: xprt queue\n");
827 if(rpc_xprt_pending
==NULL
)
829 xprt
->rx_pending_flag
=1;
830 xprt
->rx_pending
=rpc_xprt_pending
;
831 rpc_xprt_pending
=xprt
;
834 dprintk("RPC: xprt queued already %p\n", xprt
);
839 tcp_state_change(struct sock
*sk
)
841 struct rpc_xprt
*xprt
;
843 if (!(xprt
= xprt_from_sock(sk
)))
845 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
846 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
847 sk
->state
, xprt
->connected
,
848 sk
->dead
, sk
->zapped
);
850 if (sk
->state
== TCP_ESTABLISHED
&& !xprt
->connected
) {
852 xprt
->connecting
= 0;
853 rpc_wake_up(&xprt
->reconn
);
854 } else if (sk
->zapped
) {
855 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
856 rpc_wake_up_status(&xprt
->sending
, -ENOTCONN
);
857 rpc_wake_up_status(&xprt
->reconn
, -ENOTCONN
);
862 tcp_write_space(struct sock
*sk
)
864 struct rpc_xprt
*xprt
;
866 if (!(xprt
= xprt_from_sock(sk
)))
868 if(xprt
->snd_sent
&& xprt
->snd_task
)
869 printk("write space\n");
870 if(xprt
->write_space
== 0)
872 xprt
->write_space
= 1;
873 if (xprt
->snd_task
&& !RPC_IS_RUNNING(xprt
->snd_task
))
876 printk("Write wakeup snd_sent =%d\n",
878 rpc_wake_up_task(xprt
->snd_task
);
884 * RPC receive timeout handler.
887 xprt_timer(struct rpc_task
*task
)
890 xprt_adjust_cwnd(task
->tk_xprt
, -ETIMEDOUT
);
892 dprintk("RPC: %4d xprt_timer (%s request)\n", task
->tk_pid
,
893 task
->tk_rqstp
? "pending" : "backlogged");
895 task
->tk_status
= -ETIMEDOUT
;
896 task
->tk_timeout
= 0;
897 rpc_wake_up_task(task
);
901 * (Partly) transmit the RPC packet
902 * Note that task->tk_status is either 0 or negative on return.
903 * Only when the reply is received will the status be set to a
907 xprt_transmit_some(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
909 struct rpc_rqst
*req
= task
->tk_rqstp
;
913 if ((result
= xprt_sendmsg(xprt
)) >= 0) {
914 if (!xprt
->snd_buf
.io_len
|| !xprt
->stream
) {
915 rpc_wake_up_next(&xprt
->sending
);
919 } else if (xprt
->stream
) {
920 if (result
== -ENOTCONN
|| result
== -EPIPE
) {
921 xprt_disconnect(xprt
);
925 return task
->tk_status
= result
;
929 * Place the actual RPC call.
930 * We have to copy the iovec because sendmsg fiddles with its contents.
933 xprt_transmit(struct rpc_task
*task
)
935 struct rpc_timeout
*timeo
;
936 struct rpc_rqst
*req
= task
->tk_rqstp
;
937 struct rpc_xprt
*xprt
= req
->rq_xprt
;
939 /*DEBUG*/int ac_debug
=xprt
->snd_sent
;
941 dprintk("RPC: %4d xprt_transmit(%x)\n", task
->tk_pid
,
942 *(u32
*)(req
->rq_svec
[0].iov_base
));
944 if (xprt
->shutdown
) {
945 task
->tk_status
= -EIO
;
949 /* If we're not already in the process of transmitting our call,
950 * set up everything as needed. */
951 if (xprt
->snd_task
!= task
) {
952 /* Write the record marker */
956 if (!xprt
->connected
) {
957 task
->tk_status
= -ENOTCONN
;
960 marker
= htonl(0x80000000|(req
->rq_slen
-4));
961 *((u32
*) req
->rq_svec
[0].iov_base
) = marker
;
964 /* Reset timeout parameters */
965 timeo
= &req
->rq_timeout
;
966 if (timeo
->to_retries
< 0) {
967 dprintk("RPC: %4d xprt_transmit reset timeo\n",
969 timeo
->to_retries
= xprt
->timeout
.to_retries
;
970 timeo
->to_current
= timeo
->to_initval
;
974 req
->rq_xtime
= jiffies
;
978 if (xprt
->snd_task
) {
979 dprintk("RPC: %4d TCP write queue full (task %d)\n",
980 task
->tk_pid
, xprt
->snd_task
->tk_pid
);
981 rpc_sleep_on(&xprt
->sending
, task
,
982 xprt_transmit_status
, NULL
);
985 xprt
->snd_buf
= req
->rq_snd_buf
;
986 xprt
->snd_task
= task
;
988 /*DEBUG*/ac_debug
= 0;
991 /* For fast networks/servers we have to put the request on
992 * the pending list now:
995 rpc_add_wait_queue(&xprt
->pending
, task
);
996 task
->tk_callback
= NULL
;
999 /* Continue transmitting the packet/record. We must be careful
1000 * to cope with writespace callbacks arriving _after_ we have
1001 * called xprt_sendmsg().
1004 xprt
->write_space
= 0;
1005 if (xprt_transmit_some(xprt
, task
) != -EAGAIN
) {
1006 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1007 xprt
->snd_task
= NULL
;
1009 printk("Partial xmit finished\n");
1013 /*d*/printk("RPC: %4d xmit incomplete (%d left of %d)\n",
1014 task
->tk_pid
, xprt
->snd_buf
.io_len
,
1016 task
->tk_status
= 0;
1018 if (!xprt
->write_space
) {
1019 /* Remove from pending */
1020 rpc_remove_wait_queue(task
);
1021 rpc_sleep_on(&xprt
->sending
, task
,
1022 xprt_transmit_status
, NULL
);
1031 * This callback is invoked when the sending task is forced to sleep
1032 * because the TCP write buffers are full
1035 xprt_transmit_status(struct rpc_task
*task
)
1037 struct rpc_xprt
*xprt
= task
->tk_client
->cl_xprt
;
1039 dprintk("RPC: %4d transmit_status %d\n", task
->tk_pid
, task
->tk_status
);
1040 if (xprt
->snd_task
== task
)
1042 if (task
->tk_status
< 0)
1044 xprt
->snd_task
= NULL
;
1045 xprt_disconnect(xprt
);
1048 xprt_transmit(task
);
1053 * Wait for the reply to our call.
1054 * When the callback is invoked, the congestion window should have
1055 * been updated already.
1058 xprt_receive(struct rpc_task
*task
)
1060 struct rpc_rqst
*req
= task
->tk_rqstp
;
1061 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1063 dprintk("RPC: %4d xprt_receive\n", task
->tk_pid
);
1064 if (xprt
->connected
== 0) {
1065 task
->tk_status
= -ENOTCONN
;
1070 * Wait until rq_gotit goes non-null, or timeout elapsed.
1072 task
->tk_timeout
= req
->rq_timeout
.to_current
;
1075 if (!req
->rq_gotit
) {
1076 rpc_sleep_on(&xprt
->pending
, task
,
1077 xprt_receive_status
, xprt_timer
);
1081 dprintk("RPC: %4d xprt_receive returns %d\n",
1082 task
->tk_pid
, task
->tk_status
);
1086 xprt_receive_status(struct rpc_task
*task
)
1088 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1090 if (xprt
->stream
&& xprt
->tcp_rqstp
== task
->tk_rqstp
)
1091 xprt
->tcp_rqstp
= NULL
;
1095 * Reserve an RPC call slot.
1098 xprt_reserve(struct rpc_task
*task
)
1100 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1102 /* We already have an initialized request. */
1106 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1107 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1108 if ((!RPCXPRT_CONGESTED(xprt
) && xprt
->free
)) {
1109 xprt_reserve_status(task
);
1110 task
->tk_timeout
= 0;
1111 } else if (!task
->tk_timeout
) {
1112 task
->tk_status
= -ENOBUFS
;
1114 dprintk("RPC: xprt_reserve waiting on backlog\n");
1115 rpc_sleep_on(&xprt
->backlog
, task
, xprt_reserve_status
, NULL
);
1117 dprintk("RPC: %4d xprt_reserve returns %d\n",
1118 task
->tk_pid
, task
->tk_status
);
1119 return task
->tk_status
;
1123 * Reservation callback
1126 xprt_reserve_status(struct rpc_task
*task
)
1128 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1129 struct rpc_rqst
*req
;
1131 if (xprt
->shutdown
) {
1132 task
->tk_status
= -EIO
;
1133 } else if (task
->tk_status
< 0) {
1135 } else if (task
->tk_rqstp
) {
1136 /* We've already been given a request slot: NOP */
1137 } else if (!RPCXPRT_CONGESTED(xprt
)) {
1138 /* OK: There's room for us. Grab a free slot and bump
1139 * congestion value */
1145 xprt
->free
= req
->rq_next
;
1146 xprt
->cong
+= RPC_CWNDSCALE
;
1147 task
->tk_rqstp
= req
;
1148 req
->rq_next
= NULL
;
1149 xprt_request_init(task
, xprt
);
1151 task
->tk_status
= -EAGAIN
;
1154 if (xprt
->free
&& !RPCXPRT_CONGESTED(xprt
))
1155 rpc_wake_up_next(&xprt
->backlog
);
1160 printk("RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1161 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
1165 printk("RPC: used rqst slot %p on free list!\n", req
);
1167 task
->tk_status
= -EIO
;
1173 * Initialize RPC request
1176 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1178 struct rpc_rqst
*req
= task
->tk_rqstp
;
1184 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
, req
, xid
);
1185 task
->tk_status
= 0;
1187 req
->rq_timeout
= xprt
->timeout
;
1188 req
->rq_task
= task
;
1189 req
->rq_xprt
= xprt
;
1190 req
->rq_xid
= xid
++;
1194 * Release an RPC call slot
1197 xprt_release(struct rpc_task
*task
)
1199 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1200 struct rpc_rqst
*req
;
1202 if (!(req
= task
->tk_rqstp
))
1204 task
->tk_rqstp
= NULL
;
1205 memset(req
, 0, sizeof(*req
)); /* mark unused */
1207 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1209 /* remove slot from queue of pending */
1211 if (task
->tk_rpcwait
) {
1212 printk("RPC: task of released request still queued!\n");
1214 printk("RPC: (task is on %s)\n", rpc_qname(task
->tk_rpcwait
));
1216 rpc_del_timer(task
);
1217 rpc_remove_wait_queue(task
);
1221 /* Decrease congestion value. If congestion threshold is not yet
1222 * reached, pass on the request slot.
1223 * This looks kind of kludgy, but it guarantees backlogged requests
1224 * are served in order.
1226 xprt
->cong
-= RPC_CWNDSCALE
;
1227 if (!RPCXPRT_CONGESTED(xprt
)) {
1228 struct rpc_task
*next
= rpc_wake_up_next(&xprt
->backlog
);
1230 if (next
&& next
->tk_rqstp
== 0) {
1231 xprt
->cong
+= RPC_CWNDSCALE
;
1232 next
->tk_rqstp
= req
;
1233 xprt_request_init(next
, xprt
);
1238 req
->rq_next
= xprt
->free
;
1243 * Set default timeout parameters
1246 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1248 if (proto
== IPPROTO_UDP
)
1249 xprt_set_timeout(to
, 5, 5 * HZ
);
1251 xprt_set_timeout(to
, 5, 15 * HZ
);
1255 * Set constant timeout
1258 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1262 to
->to_increment
= incr
;
1263 to
->to_maxval
= incr
* retr
;
1264 to
->to_resrvval
= incr
* retr
;
1265 to
->to_retries
= retr
;
1266 to
->to_exponential
= 0;
1270 * Initialize an RPC client
1272 static struct rpc_xprt
*
1273 xprt_setup(struct socket
*sock
, int proto
,
1274 struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1276 struct rpc_xprt
*xprt
;
1277 struct rpc_rqst
*req
;
1281 dprintk("RPC: setting up %s transport...\n",
1282 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1284 #if LINUX_VERSION_CODE >= 0x020100
1287 inet
= (struct sock
*) sock
->data
;
1290 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1292 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1299 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1300 xprt
->cwnd
= RPC_INITCWND
;
1301 #ifdef SOCK_HAS_USER_DATA
1302 inet
->user_data
= xprt
;
1304 xprt
->link
= sock_list
;
1307 xprt
->old_data_ready
= inet
->data_ready
;
1308 xprt
->old_state_change
= inet
->state_change
;
1309 xprt
->old_write_space
= inet
->write_space
;
1310 if (proto
== IPPROTO_UDP
) {
1311 inet
->data_ready
= udp_data_ready
;
1313 inet
->data_ready
= tcp_data_ready
;
1314 inet
->state_change
= tcp_state_change
;
1315 inet
->write_space
= tcp_write_space
;
1318 xprt
->connected
= 1;
1320 /* Set timeout parameters */
1322 xprt
->timeout
= *to
;
1323 xprt
->timeout
.to_current
= to
->to_initval
;
1324 xprt
->timeout
.to_resrvval
= to
->to_maxval
<< 1;
1326 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1329 xprt
->pending
= RPC_INIT_WAITQ("xprt_pending");
1330 xprt
->sending
= RPC_INIT_WAITQ("xprt_sending");
1331 xprt
->backlog
= RPC_INIT_WAITQ("xprt_backlog");
1332 xprt
->reconn
= RPC_INIT_WAITQ("xprt_reconn");
1334 /* initialize free list */
1335 for (i
= 0, req
= xprt
->slot
; i
< RPC_MAXREQS
-1; i
++, req
++)
1336 req
->rq_next
= req
+ 1;
1337 req
->rq_next
= NULL
;
1338 xprt
->free
= xprt
->slot
;
1340 dprintk("RPC: created transport %p\n", xprt
);
1343 * TCP requires the rpc I/O daemon is present
1345 if(proto
==IPPROTO_TCP
)
1351 * Create and initialize an RPC client given an open file.
1352 * This is obsolete now.
1356 xprt_create(struct file
*file
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1358 struct rpc_xprt
*xprt
;
1359 struct socket
*sock
;
1363 printk("RPC: file == NULL in xprt_create!\n");
1367 sock
= &file
->f_inode
->u
.socket_i
;
1368 if (sock
->ops
->family
!= PF_INET
) {
1369 printk(KERN_WARNING
"RPC: only INET sockets supported\n");
1373 proto
= (sock
->type
== SOCK_DGRAM
)? IPPROTO_UDP
: IPPROTO_TCP
;
1374 if ((xprt
= xprt_setup(sock
, proto
, ap
, to
)) != NULL
) {
1384 * Bind to a reserved port
1387 xprt_bindresvport(struct socket
*sock
)
1389 struct sockaddr_in myaddr
;
1392 memset(&myaddr
, 0, sizeof(myaddr
));
1393 myaddr
.sin_family
= AF_INET
;
1396 myaddr
.sin_port
= htons(port
);
1397 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1399 } while (err
== -EADDRINUSE
&& --port
> 0);
1402 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1408 * Create a client socket given the protocol and peer address.
1410 static struct socket
*
1411 xprt_create_socket(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1413 struct socket
*sock
;
1416 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1417 sap
? ntohl(sap
->sin_addr
.s_addr
) : 0,
1418 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1420 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1421 if ((err
= sock_create(PF_INET
, type
, proto
, &sock
)) < 0) {
1422 printk("RPC: can't create socket (%d).\n", -err
);
1426 /* If the caller has root privs, bind to a reserved port */
1427 if (!current
->fsuid
&& xprt_bindresvport(sock
) < 0)
1430 if (type
== SOCK_STREAM
&& sap
) {
1431 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) sap
,
1434 printk("RPC: TCP connect failed (%d).\n", -err
);
1447 * Create an RPC client transport given the protocol and peer address.
1450 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1452 struct socket
*sock
;
1453 struct rpc_xprt
*xprt
;
1455 dprintk("RPC: xprt_create_proto called\n");
1457 if (!(sock
= xprt_create_socket(proto
, sap
, to
)))
1460 if (!(xprt
= xprt_setup(sock
, proto
, sap
, to
)))
1467 * Prepare for transport shutdown.
1470 xprt_shutdown(struct rpc_xprt
*xprt
)
1473 rpc_wake_up(&xprt
->sending
);
1474 rpc_wake_up(&xprt
->pending
);
1475 rpc_wake_up(&xprt
->backlog
);
1476 rpc_wake_up(&xprt
->reconn
);
1480 * Destroy an RPC transport, killing off all requests.
1483 xprt_destroy(struct rpc_xprt
*xprt
)
1485 #ifndef SOCK_HAS_USER_DATA
1486 struct rpc_xprt
**q
;
1488 for (q
= &sock_list
; *q
&& *q
!= xprt
; q
= &((*q
)->link
))
1491 printk(KERN_WARNING
"xprt_destroy: unknown socket!\n");
1492 return -EIO
; /* why is there no EBUGGYSOFTWARE */
1497 dprintk("RPC: destroying transport %p\n", xprt
);