2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #include <linux/types.h>
47 #include <linux/slab.h>
48 #include <linux/capability.h>
49 #include <linux/sched.h>
50 #include <linux/errno.h>
51 #include <linux/socket.h>
53 #include <linux/net.h>
55 #include <linux/udp.h>
56 #include <linux/tcp.h>
57 #include <linux/sunrpc/clnt.h>
58 #include <linux/file.h>
59 #include <linux/workqueue.h>
60 #include <linux/random.h>
63 #include <net/checksum.h>
72 # undef RPC_DEBUG_DATA
73 # define RPCDBG_FACILITY RPCDBG_XPRT
76 #define XPRT_MAX_BACKOFF (8)
77 #define XPRT_IDLE_TIMEOUT (5*60*HZ)
78 #define XPRT_MAX_RESVPORT (800)
83 static void xprt_request_init(struct rpc_task
*, struct rpc_xprt
*);
84 static inline void do_xprt_reserve(struct rpc_task
*);
85 static void xprt_disconnect(struct rpc_xprt
*);
86 static void xprt_connect_status(struct rpc_task
*task
);
87 static struct rpc_xprt
* xprt_setup(int proto
, struct sockaddr_in
*ap
,
88 struct rpc_timeout
*to
);
89 static struct socket
*xprt_create_socket(struct rpc_xprt
*, int, int);
90 static void xprt_bind_socket(struct rpc_xprt
*, struct socket
*);
91 static int __xprt_get_cong(struct rpc_xprt
*, struct rpc_task
*);
93 static int xprt_clear_backlog(struct rpc_xprt
*xprt
);
97 * Print the buffer contents (first 128 bytes only--just enough for
101 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
103 u8
*buf
= (u8
*) packet
;
106 dprintk("RPC: %s\n", msg
);
107 for (j
= 0; j
< count
&& j
< 128; j
+= 4) {
111 dprintk("0x%04x ", j
);
113 dprintk("%02x%02x%02x%02x ",
114 buf
[j
], buf
[j
+1], buf
[j
+2], buf
[j
+3]);
120 xprt_pktdump(char *msg
, u32
*packet
, unsigned int count
)
127 * Look up RPC transport given an INET socket
129 static inline struct rpc_xprt
*
130 xprt_from_sock(struct sock
*sk
)
132 return (struct rpc_xprt
*) sk
->sk_user_data
;
136 * Serialize write access to sockets, in order to prevent different
137 * requests from interfering with each other.
138 * Also prevents TCP socket connects from colliding with writes.
141 __xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
143 struct rpc_rqst
*req
= task
->tk_rqstp
;
145 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
)) {
146 if (task
== xprt
->snd_task
)
152 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
153 xprt
->snd_task
= task
;
155 req
->rq_bytes_sent
= 0;
160 smp_mb__before_clear_bit();
161 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
162 smp_mb__after_clear_bit();
164 dprintk("RPC: %4d failed to lock socket %p\n", task
->tk_pid
, xprt
);
165 task
->tk_timeout
= 0;
166 task
->tk_status
= -EAGAIN
;
167 if (req
&& req
->rq_ntrans
)
168 rpc_sleep_on(&xprt
->resend
, task
, NULL
, NULL
);
170 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
175 xprt_lock_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
179 spin_lock_bh(&xprt
->sock_lock
);
180 retval
= __xprt_lock_write(xprt
, task
);
181 spin_unlock_bh(&xprt
->sock_lock
);
187 __xprt_lock_write_next(struct rpc_xprt
*xprt
)
189 struct rpc_task
*task
;
191 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
193 if (!xprt
->nocong
&& RPCXPRT_CONGESTED(xprt
))
195 task
= rpc_wake_up_next(&xprt
->resend
);
197 task
= rpc_wake_up_next(&xprt
->sending
);
201 if (xprt
->nocong
|| __xprt_get_cong(xprt
, task
)) {
202 struct rpc_rqst
*req
= task
->tk_rqstp
;
203 xprt
->snd_task
= task
;
205 req
->rq_bytes_sent
= 0;
211 smp_mb__before_clear_bit();
212 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
213 smp_mb__after_clear_bit();
217 * Releases the socket for use by other requests.
220 __xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
222 if (xprt
->snd_task
== task
) {
223 xprt
->snd_task
= NULL
;
224 smp_mb__before_clear_bit();
225 clear_bit(XPRT_LOCKED
, &xprt
->sockstate
);
226 smp_mb__after_clear_bit();
227 __xprt_lock_write_next(xprt
);
232 xprt_release_write(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
234 spin_lock_bh(&xprt
->sock_lock
);
235 __xprt_release_write(xprt
, task
);
236 spin_unlock_bh(&xprt
->sock_lock
);
240 * Write data to socket.
243 xprt_sendmsg(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
245 struct socket
*sock
= xprt
->sock
;
246 struct xdr_buf
*xdr
= &req
->rq_snd_buf
;
247 struct sockaddr
*addr
= NULL
;
255 xprt_pktdump("packet data:",
256 req
->rq_svec
->iov_base
,
257 req
->rq_svec
->iov_len
);
259 /* For UDP, we need to provide an address */
261 addr
= (struct sockaddr
*) &xprt
->addr
;
262 addrlen
= sizeof(xprt
->addr
);
264 /* Dont repeat bytes */
265 skip
= req
->rq_bytes_sent
;
267 clear_bit(SOCK_ASYNC_NOSPACE
, &sock
->flags
);
268 result
= xdr_sendpages(sock
, addr
, addrlen
, xdr
, skip
, MSG_DONTWAIT
);
270 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr
->len
- skip
, result
);
277 /* When the server has died, an ICMP port unreachable message
278 * prompts ECONNREFUSED.
285 /* connection broken */
290 printk(KERN_NOTICE
"RPC: sendmsg returned error %d\n", -result
);
296 * Van Jacobson congestion avoidance. Check if the congestion window
297 * overflowed. Put the task to sleep if this is the case.
300 __xprt_get_cong(struct rpc_xprt
*xprt
, struct rpc_task
*task
)
302 struct rpc_rqst
*req
= task
->tk_rqstp
;
306 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
307 task
->tk_pid
, xprt
->cong
, xprt
->cwnd
);
308 if (RPCXPRT_CONGESTED(xprt
))
311 xprt
->cong
+= RPC_CWNDSCALE
;
316 * Adjust the congestion window, and wake up the next task
317 * that has been sleeping due to congestion
320 __xprt_put_cong(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
)
325 xprt
->cong
-= RPC_CWNDSCALE
;
326 __xprt_lock_write_next(xprt
);
330 * Adjust RPC congestion window
331 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
334 xprt_adjust_cwnd(struct rpc_xprt
*xprt
, int result
)
339 if (result
>= 0 && cwnd
<= xprt
->cong
) {
340 /* The (cwnd >> 1) term makes sure
341 * the result gets rounded properly. */
342 cwnd
+= (RPC_CWNDSCALE
* RPC_CWNDSCALE
+ (cwnd
>> 1)) / cwnd
;
343 if (cwnd
> RPC_MAXCWND(xprt
))
344 cwnd
= RPC_MAXCWND(xprt
);
345 __xprt_lock_write_next(xprt
);
346 } else if (result
== -ETIMEDOUT
) {
348 if (cwnd
< RPC_CWNDSCALE
)
349 cwnd
= RPC_CWNDSCALE
;
351 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
352 xprt
->cong
, xprt
->cwnd
, cwnd
);
357 * Reset the major timeout value
359 static void xprt_reset_majortimeo(struct rpc_rqst
*req
)
361 struct rpc_timeout
*to
= &req
->rq_xprt
->timeout
;
363 req
->rq_majortimeo
= req
->rq_timeout
;
364 if (to
->to_exponential
)
365 req
->rq_majortimeo
<<= to
->to_retries
;
367 req
->rq_majortimeo
+= to
->to_increment
* to
->to_retries
;
368 if (req
->rq_majortimeo
> to
->to_maxval
|| req
->rq_majortimeo
== 0)
369 req
->rq_majortimeo
= to
->to_maxval
;
370 req
->rq_majortimeo
+= jiffies
;
374 * Adjust timeout values etc for next retransmit
376 int xprt_adjust_timeout(struct rpc_rqst
*req
)
378 struct rpc_xprt
*xprt
= req
->rq_xprt
;
379 struct rpc_timeout
*to
= &xprt
->timeout
;
382 if (time_before(jiffies
, req
->rq_majortimeo
)) {
383 if (to
->to_exponential
)
384 req
->rq_timeout
<<= 1;
386 req
->rq_timeout
+= to
->to_increment
;
387 if (to
->to_maxval
&& req
->rq_timeout
>= to
->to_maxval
)
388 req
->rq_timeout
= to
->to_maxval
;
390 pprintk("RPC: %lu retrans\n", jiffies
);
392 req
->rq_timeout
= to
->to_initval
;
394 xprt_reset_majortimeo(req
);
395 /* Reset the RTT counters == "slow start" */
396 spin_lock_bh(&xprt
->sock_lock
);
397 rpc_init_rtt(req
->rq_task
->tk_client
->cl_rtt
, to
->to_initval
);
398 spin_unlock_bh(&xprt
->sock_lock
);
399 pprintk("RPC: %lu timeout\n", jiffies
);
403 if (req
->rq_timeout
== 0) {
404 printk(KERN_WARNING
"xprt_adjust_timeout: rq_timeout = 0!\n");
405 req
->rq_timeout
= 5 * HZ
;
411 * Close down a transport socket
414 xprt_close(struct rpc_xprt
*xprt
)
416 struct socket
*sock
= xprt
->sock
;
417 struct sock
*sk
= xprt
->inet
;
422 write_lock_bh(&sk
->sk_callback_lock
);
426 sk
->sk_user_data
= NULL
;
427 sk
->sk_data_ready
= xprt
->old_data_ready
;
428 sk
->sk_state_change
= xprt
->old_state_change
;
429 sk
->sk_write_space
= xprt
->old_write_space
;
430 write_unlock_bh(&sk
->sk_callback_lock
);
438 xprt_socket_autoclose(void *args
)
440 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
442 xprt_disconnect(xprt
);
444 xprt_release_write(xprt
, NULL
);
448 * Mark a transport as disconnected
451 xprt_disconnect(struct rpc_xprt
*xprt
)
453 dprintk("RPC: disconnected transport %p\n", xprt
);
454 spin_lock_bh(&xprt
->sock_lock
);
455 xprt_clear_connected(xprt
);
456 rpc_wake_up_status(&xprt
->pending
, -ENOTCONN
);
457 spin_unlock_bh(&xprt
->sock_lock
);
461 * Used to allow disconnection when we've been idle
464 xprt_init_autodisconnect(unsigned long data
)
466 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)data
;
468 spin_lock(&xprt
->sock_lock
);
469 if (!list_empty(&xprt
->recv
) || xprt
->shutdown
)
471 if (test_and_set_bit(XPRT_LOCKED
, &xprt
->sockstate
))
473 spin_unlock(&xprt
->sock_lock
);
474 /* Let keventd close the socket */
475 if (test_bit(XPRT_CONNECTING
, &xprt
->sockstate
) != 0)
476 xprt_release_write(xprt
, NULL
);
478 schedule_work(&xprt
->task_cleanup
);
481 spin_unlock(&xprt
->sock_lock
);
484 static void xprt_socket_connect(void *args
)
486 struct rpc_xprt
*xprt
= (struct rpc_xprt
*)args
;
487 struct socket
*sock
= xprt
->sock
;
490 if (xprt
->shutdown
|| xprt
->addr
.sin_port
== 0)
494 * Start by resetting any existing state
497 sock
= xprt_create_socket(xprt
, xprt
->prot
, xprt
->resvport
);
499 /* couldn't create socket or bind to reserved port;
500 * this is likely a permanent error, so cause an abort */
503 xprt_bind_socket(xprt
, sock
);
504 xprt_sock_setbufsize(xprt
);
511 * Tell the socket layer to start connecting...
513 status
= sock
->ops
->connect(sock
, (struct sockaddr
*) &xprt
->addr
,
514 sizeof(xprt
->addr
), O_NONBLOCK
);
515 dprintk("RPC: %p connect status %d connected %d sock state %d\n",
516 xprt
, -status
, xprt_connected(xprt
), sock
->sk
->sk_state
);
526 rpc_wake_up_status(&xprt
->pending
, status
);
528 rpc_wake_up(&xprt
->pending
);
530 smp_mb__before_clear_bit();
531 clear_bit(XPRT_CONNECTING
, &xprt
->sockstate
);
532 smp_mb__after_clear_bit();
536 * Attempt to connect a TCP socket.
539 void xprt_connect(struct rpc_task
*task
)
541 struct rpc_xprt
*xprt
= task
->tk_xprt
;
543 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task
->tk_pid
,
544 xprt
, (xprt_connected(xprt
) ? "is" : "is not"));
546 if (xprt
->shutdown
) {
547 task
->tk_status
= -EIO
;
550 if (!xprt
->addr
.sin_port
) {
551 task
->tk_status
= -EIO
;
554 if (!xprt_lock_write(xprt
, task
))
556 if (xprt_connected(xprt
))
560 task
->tk_rqstp
->rq_bytes_sent
= 0;
562 task
->tk_timeout
= RPC_CONNECT_TIMEOUT
;
563 rpc_sleep_on(&xprt
->pending
, task
, xprt_connect_status
, NULL
);
564 if (!test_and_set_bit(XPRT_CONNECTING
, &xprt
->sockstate
)) {
565 /* Note: if we are here due to a dropped connection
566 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
569 if (xprt
->sock
!= NULL
)
570 schedule_delayed_work(&xprt
->sock_connect
,
571 RPC_REESTABLISH_TIMEOUT
);
573 schedule_work(&xprt
->sock_connect
);
574 if (!RPC_IS_ASYNC(task
))
575 flush_scheduled_work();
580 xprt_release_write(xprt
, task
);
584 * We arrive here when awoken from waiting on connection establishment.
587 xprt_connect_status(struct rpc_task
*task
)
589 struct rpc_xprt
*xprt
= task
->tk_xprt
;
591 if (task
->tk_status
>= 0) {
592 dprintk("RPC: %4d xprt_connect_status: connection established\n",
597 /* if soft mounted, just cause this RPC to fail */
598 if (RPC_IS_SOFT(task
))
599 task
->tk_status
= -EIO
;
601 switch (task
->tk_status
) {
607 dprintk("RPC: %4d xprt_connect_status: timed out\n",
611 printk(KERN_ERR
"RPC: error %d connecting to server %s\n",
612 -task
->tk_status
, task
->tk_client
->cl_server
);
614 xprt_release_write(xprt
, task
);
618 * Look up the RPC request corresponding to a reply, and then lock it.
620 static inline struct rpc_rqst
*
621 xprt_lookup_rqst(struct rpc_xprt
*xprt
, u32 xid
)
623 struct list_head
*pos
;
624 struct rpc_rqst
*req
= NULL
;
626 list_for_each(pos
, &xprt
->recv
) {
627 struct rpc_rqst
*entry
= list_entry(pos
, struct rpc_rqst
, rq_list
);
628 if (entry
->rq_xid
== xid
) {
637 * Complete reply received.
638 * The TCP code relies on us to remove the request from xprt->pending.
641 xprt_complete_rqst(struct rpc_xprt
*xprt
, struct rpc_rqst
*req
, int copied
)
643 struct rpc_task
*task
= req
->rq_task
;
644 struct rpc_clnt
*clnt
= task
->tk_client
;
646 /* Adjust congestion window */
648 unsigned timer
= task
->tk_msg
.rpc_proc
->p_timer
;
649 xprt_adjust_cwnd(xprt
, copied
);
650 __xprt_put_cong(xprt
, req
);
652 if (req
->rq_ntrans
== 1)
653 rpc_update_rtt(clnt
->cl_rtt
, timer
,
654 (long)jiffies
- req
->rq_xtime
);
655 rpc_set_timeo(clnt
->cl_rtt
, timer
, req
->rq_ntrans
- 1);
660 /* Profile only reads for now */
662 static unsigned long nextstat
;
663 static unsigned long pkt_rtt
, pkt_len
, pkt_cnt
;
666 pkt_len
+= req
->rq_slen
+ copied
;
667 pkt_rtt
+= jiffies
- req
->rq_xtime
;
668 if (time_before(nextstat
, jiffies
)) {
669 printk("RPC: %lu %ld cwnd\n", jiffies
, xprt
->cwnd
);
670 printk("RPC: %ld %ld %ld %ld stat\n",
671 jiffies
, pkt_cnt
, pkt_len
, pkt_rtt
);
672 pkt_rtt
= pkt_len
= pkt_cnt
= 0;
673 nextstat
= jiffies
+ 5 * HZ
;
678 dprintk("RPC: %4d has input (%d bytes)\n", task
->tk_pid
, copied
);
679 list_del_init(&req
->rq_list
);
680 req
->rq_received
= req
->rq_private_buf
.len
= copied
;
682 /* ... and wake up the process. */
683 rpc_wake_up_task(task
);
688 skb_read_bits(skb_reader_t
*desc
, void *to
, size_t len
)
690 if (len
> desc
->count
)
692 if (skb_copy_bits(desc
->skb
, desc
->offset
, to
, len
))
700 skb_read_and_csum_bits(skb_reader_t
*desc
, void *to
, size_t len
)
702 unsigned int csum2
, pos
;
704 if (len
> desc
->count
)
707 csum2
= skb_copy_and_csum_bits(desc
->skb
, pos
, to
, len
, 0);
708 desc
->csum
= csum_block_add(desc
->csum
, csum2
, pos
);
715 * We have set things up such that we perform the checksum of the UDP
716 * packet in parallel with the copies into the RPC client iovec. -DaveM
719 csum_partial_copy_to_xdr(struct xdr_buf
*xdr
, struct sk_buff
*skb
)
724 desc
.offset
= sizeof(struct udphdr
);
725 desc
.count
= skb
->len
- desc
.offset
;
727 if (skb
->ip_summed
== CHECKSUM_UNNECESSARY
)
730 desc
.csum
= csum_partial(skb
->data
, desc
.offset
, skb
->csum
);
731 if (xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_and_csum_bits
) < 0)
733 if (desc
.offset
!= skb
->len
) {
735 csum2
= skb_checksum(skb
, desc
.offset
, skb
->len
- desc
.offset
, 0);
736 desc
.csum
= csum_block_add(desc
.csum
, csum2
, desc
.offset
);
740 if ((unsigned short)csum_fold(desc
.csum
))
744 if (xdr_partial_copy_from_skb(xdr
, 0, &desc
, skb_read_bits
) < 0)
752 * Input handler for RPC replies. Called from a bottom half and hence
756 udp_data_ready(struct sock
*sk
, int len
)
758 struct rpc_task
*task
;
759 struct rpc_xprt
*xprt
;
760 struct rpc_rqst
*rovr
;
762 int err
, repsize
, copied
;
765 read_lock(&sk
->sk_callback_lock
);
766 dprintk("RPC: udp_data_ready...\n");
767 if (!(xprt
= xprt_from_sock(sk
))) {
768 printk("RPC: udp_data_ready request not found!\n");
772 dprintk("RPC: udp_data_ready client %p\n", xprt
);
774 if ((skb
= skb_recv_datagram(sk
, 0, 1, &err
)) == NULL
)
780 repsize
= skb
->len
- sizeof(struct udphdr
);
782 printk("RPC: impossible RPC reply size %d!\n", repsize
);
786 /* Copy the XID from the skb... */
787 xp
= skb_header_pointer(skb
, sizeof(struct udphdr
),
788 sizeof(_xid
), &_xid
);
792 /* Look up and lock the request corresponding to the given XID */
793 spin_lock(&xprt
->sock_lock
);
794 rovr
= xprt_lookup_rqst(xprt
, *xp
);
797 task
= rovr
->rq_task
;
799 dprintk("RPC: %4d received reply\n", task
->tk_pid
);
801 if ((copied
= rovr
->rq_private_buf
.buflen
) > repsize
)
804 /* Suck it into the iovec, verify checksum if not done by hw. */
805 if (csum_partial_copy_to_xdr(&rovr
->rq_private_buf
, skb
))
808 /* Something worked... */
809 dst_confirm(skb
->dst
);
811 xprt_complete_rqst(xprt
, rovr
, copied
);
814 spin_unlock(&xprt
->sock_lock
);
816 skb_free_datagram(sk
, skb
);
818 read_unlock(&sk
->sk_callback_lock
);
822 * Copy from an skb into memory and shrink the skb.
825 tcp_copy_data(skb_reader_t
*desc
, void *p
, size_t len
)
827 if (len
> desc
->count
)
829 if (skb_copy_bits(desc
->skb
, desc
->offset
, p
, len
)) {
830 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n",
836 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n",
842 * TCP read fragment marker
845 tcp_read_fraghdr(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
850 p
= ((char *) &xprt
->tcp_recm
) + xprt
->tcp_offset
;
851 len
= sizeof(xprt
->tcp_recm
) - xprt
->tcp_offset
;
852 used
= tcp_copy_data(desc
, p
, len
);
853 xprt
->tcp_offset
+= used
;
856 xprt
->tcp_reclen
= ntohl(xprt
->tcp_recm
);
857 if (xprt
->tcp_reclen
& 0x80000000)
858 xprt
->tcp_flags
|= XPRT_LAST_FRAG
;
860 xprt
->tcp_flags
&= ~XPRT_LAST_FRAG
;
861 xprt
->tcp_reclen
&= 0x7fffffff;
862 xprt
->tcp_flags
&= ~XPRT_COPY_RECM
;
863 xprt
->tcp_offset
= 0;
864 /* Sanity check of the record length */
865 if (xprt
->tcp_reclen
< 4) {
866 printk(KERN_ERR
"RPC: Invalid TCP record fragment length\n");
867 xprt_disconnect(xprt
);
869 dprintk("RPC: reading TCP record fragment of length %d\n",
874 tcp_check_recm(struct rpc_xprt
*xprt
)
876 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n",
877 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
, xprt
->tcp_flags
);
878 if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
879 xprt
->tcp_flags
|= XPRT_COPY_RECM
;
880 xprt
->tcp_offset
= 0;
881 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
) {
882 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
883 xprt
->tcp_flags
|= XPRT_COPY_XID
;
884 xprt
->tcp_copied
= 0;
893 tcp_read_xid(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
898 len
= sizeof(xprt
->tcp_xid
) - xprt
->tcp_offset
;
899 dprintk("RPC: reading XID (%Zu bytes)\n", len
);
900 p
= ((char *) &xprt
->tcp_xid
) + xprt
->tcp_offset
;
901 used
= tcp_copy_data(desc
, p
, len
);
902 xprt
->tcp_offset
+= used
;
905 xprt
->tcp_flags
&= ~XPRT_COPY_XID
;
906 xprt
->tcp_flags
|= XPRT_COPY_DATA
;
907 xprt
->tcp_copied
= 4;
908 dprintk("RPC: reading reply for XID %08x\n",
909 ntohl(xprt
->tcp_xid
));
910 tcp_check_recm(xprt
);
914 * TCP read and complete request
917 tcp_read_request(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
919 struct rpc_rqst
*req
;
920 struct xdr_buf
*rcvbuf
;
924 /* Find and lock the request corresponding to this xid */
925 spin_lock(&xprt
->sock_lock
);
926 req
= xprt_lookup_rqst(xprt
, xprt
->tcp_xid
);
928 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
929 dprintk("RPC: XID %08x request not found!\n",
930 ntohl(xprt
->tcp_xid
));
931 spin_unlock(&xprt
->sock_lock
);
935 rcvbuf
= &req
->rq_private_buf
;
937 if (len
> xprt
->tcp_reclen
- xprt
->tcp_offset
) {
938 skb_reader_t my_desc
;
940 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
941 memcpy(&my_desc
, desc
, sizeof(my_desc
));
943 r
= xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
944 &my_desc
, tcp_copy_data
);
948 r
= xdr_partial_copy_from_skb(rcvbuf
, xprt
->tcp_copied
,
949 desc
, tcp_copy_data
);
952 xprt
->tcp_copied
+= r
;
953 xprt
->tcp_offset
+= r
;
956 /* Error when copying to the receive buffer,
957 * usually because we weren't able to allocate
958 * additional buffer pages. All we can do now
959 * is turn off XPRT_COPY_DATA, so the request
960 * will not receive any additional updates,
962 * Any remaining data from this record will
965 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
966 dprintk("RPC: XID %08x truncated request\n",
967 ntohl(xprt
->tcp_xid
));
968 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
969 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
);
973 dprintk("RPC: XID %08x read %u bytes\n",
974 ntohl(xprt
->tcp_xid
), r
);
975 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n",
976 xprt
, xprt
->tcp_copied
, xprt
->tcp_offset
, xprt
->tcp_reclen
);
978 if (xprt
->tcp_copied
== req
->rq_private_buf
.buflen
)
979 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
980 else if (xprt
->tcp_offset
== xprt
->tcp_reclen
) {
981 if (xprt
->tcp_flags
& XPRT_LAST_FRAG
)
982 xprt
->tcp_flags
&= ~XPRT_COPY_DATA
;
986 if (!(xprt
->tcp_flags
& XPRT_COPY_DATA
)) {
987 dprintk("RPC: %4d received reply complete\n",
988 req
->rq_task
->tk_pid
);
989 xprt_complete_rqst(xprt
, req
, xprt
->tcp_copied
);
991 spin_unlock(&xprt
->sock_lock
);
992 tcp_check_recm(xprt
);
996 * TCP discard extra bytes from a short read
999 tcp_read_discard(struct rpc_xprt
*xprt
, skb_reader_t
*desc
)
1003 len
= xprt
->tcp_reclen
- xprt
->tcp_offset
;
1004 if (len
> desc
->count
)
1007 desc
->offset
+= len
;
1008 xprt
->tcp_offset
+= len
;
1009 dprintk("RPC: discarded %u bytes\n", len
);
1010 tcp_check_recm(xprt
);
1014 * TCP record receive routine
1015 * We first have to grab the record marker, then the XID, then the data.
1018 tcp_data_recv(read_descriptor_t
*rd_desc
, struct sk_buff
*skb
,
1019 unsigned int offset
, size_t len
)
1021 struct rpc_xprt
*xprt
= rd_desc
->arg
.data
;
1022 skb_reader_t desc
= {
1029 dprintk("RPC: tcp_data_recv\n");
1031 /* Read in a new fragment marker if necessary */
1032 /* Can we ever really expect to get completely empty fragments? */
1033 if (xprt
->tcp_flags
& XPRT_COPY_RECM
) {
1034 tcp_read_fraghdr(xprt
, &desc
);
1037 /* Read in the xid if necessary */
1038 if (xprt
->tcp_flags
& XPRT_COPY_XID
) {
1039 tcp_read_xid(xprt
, &desc
);
1042 /* Read in the request data */
1043 if (xprt
->tcp_flags
& XPRT_COPY_DATA
) {
1044 tcp_read_request(xprt
, &desc
);
1047 /* Skip over any trailing bytes on short reads */
1048 tcp_read_discard(xprt
, &desc
);
1049 } while (desc
.count
);
1050 dprintk("RPC: tcp_data_recv done\n");
1051 return len
- desc
.count
;
1054 static void tcp_data_ready(struct sock
*sk
, int bytes
)
1056 struct rpc_xprt
*xprt
;
1057 read_descriptor_t rd_desc
;
1059 read_lock(&sk
->sk_callback_lock
);
1060 dprintk("RPC: tcp_data_ready...\n");
1061 if (!(xprt
= xprt_from_sock(sk
))) {
1062 printk("RPC: tcp_data_ready socket info not found!\n");
1068 /* We use rd_desc to pass struct xprt to tcp_data_recv */
1069 rd_desc
.arg
.data
= xprt
;
1070 rd_desc
.count
= 65536;
1071 tcp_read_sock(sk
, &rd_desc
, tcp_data_recv
);
1073 read_unlock(&sk
->sk_callback_lock
);
1077 tcp_state_change(struct sock
*sk
)
1079 struct rpc_xprt
*xprt
;
1081 read_lock(&sk
->sk_callback_lock
);
1082 if (!(xprt
= xprt_from_sock(sk
)))
1084 dprintk("RPC: tcp_state_change client %p...\n", xprt
);
1085 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1086 sk
->sk_state
, xprt_connected(xprt
),
1087 sock_flag(sk
, SOCK_DEAD
),
1088 sock_flag(sk
, SOCK_ZAPPED
));
1090 switch (sk
->sk_state
) {
1091 case TCP_ESTABLISHED
:
1092 spin_lock_bh(&xprt
->sock_lock
);
1093 if (!xprt_test_and_set_connected(xprt
)) {
1094 /* Reset TCP record info */
1095 xprt
->tcp_offset
= 0;
1096 xprt
->tcp_reclen
= 0;
1097 xprt
->tcp_copied
= 0;
1098 xprt
->tcp_flags
= XPRT_COPY_RECM
| XPRT_COPY_XID
;
1099 rpc_wake_up(&xprt
->pending
);
1101 spin_unlock_bh(&xprt
->sock_lock
);
1107 xprt_disconnect(xprt
);
1111 read_unlock(&sk
->sk_callback_lock
);
1115 * Called when more output buffer space is available for this socket.
1116 * We try not to wake our writers until they can make "significant"
1117 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1118 * with a bunch of small requests.
1121 xprt_write_space(struct sock
*sk
)
1123 struct rpc_xprt
*xprt
;
1124 struct socket
*sock
;
1126 read_lock(&sk
->sk_callback_lock
);
1127 if (!(xprt
= xprt_from_sock(sk
)) || !(sock
= sk
->sk_socket
))
1132 /* Wait until we have enough socket memory */
1134 /* from net/core/stream.c:sk_stream_write_space */
1135 if (sk_stream_wspace(sk
) < sk_stream_min_wspace(sk
))
1138 /* from net/core/sock.c:sock_def_write_space */
1139 if (!sock_writeable(sk
))
1143 if (!test_and_clear_bit(SOCK_NOSPACE
, &sock
->flags
))
1146 spin_lock_bh(&xprt
->sock_lock
);
1148 rpc_wake_up_task(xprt
->snd_task
);
1149 spin_unlock_bh(&xprt
->sock_lock
);
1151 read_unlock(&sk
->sk_callback_lock
);
1155 * RPC receive timeout handler.
1158 xprt_timer(struct rpc_task
*task
)
1160 struct rpc_rqst
*req
= task
->tk_rqstp
;
1161 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1163 spin_lock(&xprt
->sock_lock
);
1164 if (req
->rq_received
)
1167 xprt_adjust_cwnd(req
->rq_xprt
, -ETIMEDOUT
);
1168 __xprt_put_cong(xprt
, req
);
1170 dprintk("RPC: %4d xprt_timer (%s request)\n",
1171 task
->tk_pid
, req
? "pending" : "backlogged");
1173 task
->tk_status
= -ETIMEDOUT
;
1175 task
->tk_timeout
= 0;
1176 rpc_wake_up_task(task
);
1177 spin_unlock(&xprt
->sock_lock
);
1181 * Place the actual RPC call.
1182 * We have to copy the iovec because sendmsg fiddles with its contents.
1185 xprt_prepare_transmit(struct rpc_task
*task
)
1187 struct rpc_rqst
*req
= task
->tk_rqstp
;
1188 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1191 dprintk("RPC: %4d xprt_prepare_transmit\n", task
->tk_pid
);
1196 spin_lock_bh(&xprt
->sock_lock
);
1197 if (req
->rq_received
&& !req
->rq_bytes_sent
) {
1198 err
= req
->rq_received
;
1201 if (!__xprt_lock_write(xprt
, task
)) {
1206 if (!xprt_connected(xprt
)) {
1211 spin_unlock_bh(&xprt
->sock_lock
);
1216 xprt_transmit(struct rpc_task
*task
)
1218 struct rpc_clnt
*clnt
= task
->tk_client
;
1219 struct rpc_rqst
*req
= task
->tk_rqstp
;
1220 struct rpc_xprt
*xprt
= req
->rq_xprt
;
1221 int status
, retry
= 0;
1224 dprintk("RPC: %4d xprt_transmit(%u)\n", task
->tk_pid
, req
->rq_slen
);
1226 /* set up everything as needed. */
1227 /* Write the record marker */
1229 u32
*marker
= req
->rq_svec
[0].iov_base
;
1231 *marker
= htonl(0x80000000|(req
->rq_slen
-sizeof(*marker
)));
1235 if (!req
->rq_received
) {
1236 if (list_empty(&req
->rq_list
)) {
1237 spin_lock_bh(&xprt
->sock_lock
);
1238 /* Update the softirq receive buffer */
1239 memcpy(&req
->rq_private_buf
, &req
->rq_rcv_buf
,
1240 sizeof(req
->rq_private_buf
));
1241 /* Add request to the receive list */
1242 list_add_tail(&req
->rq_list
, &xprt
->recv
);
1243 spin_unlock_bh(&xprt
->sock_lock
);
1244 xprt_reset_majortimeo(req
);
1245 /* Turn off autodisconnect */
1246 del_singleshot_timer_sync(&xprt
->timer
);
1248 } else if (!req
->rq_bytes_sent
)
1251 /* Continue transmitting the packet/record. We must be careful
1252 * to cope with writespace callbacks arriving _after_ we have
1253 * called xprt_sendmsg().
1256 req
->rq_xtime
= jiffies
;
1257 status
= xprt_sendmsg(xprt
, req
);
1263 req
->rq_bytes_sent
+= status
;
1265 /* If we've sent the entire packet, immediately
1266 * reset the count of bytes sent. */
1267 if (req
->rq_bytes_sent
>= req
->rq_slen
) {
1268 req
->rq_bytes_sent
= 0;
1272 if (status
>= req
->rq_slen
)
1278 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1279 task
->tk_pid
, req
->rq_slen
- req
->rq_bytes_sent
,
1287 /* Note: at this point, task->tk_sleeping has not yet been set,
1288 * hence there is no danger of the waking up task being put on
1289 * schedq, and being picked up by a parallel run of rpciod().
1291 task
->tk_status
= status
;
1295 if (test_bit(SOCK_ASYNC_NOSPACE
, &xprt
->sock
->flags
)) {
1296 /* Protect against races with xprt_write_space */
1297 spin_lock_bh(&xprt
->sock_lock
);
1298 /* Don't race with disconnect */
1299 if (!xprt_connected(xprt
))
1300 task
->tk_status
= -ENOTCONN
;
1301 else if (test_bit(SOCK_NOSPACE
, &xprt
->sock
->flags
)) {
1302 task
->tk_timeout
= req
->rq_timeout
;
1303 rpc_sleep_on(&xprt
->pending
, task
, NULL
, NULL
);
1305 spin_unlock_bh(&xprt
->sock_lock
);
1308 /* Keep holding the socket if it is blocked */
1309 rpc_delay(task
, HZ
>>4);
1312 task
->tk_timeout
= RPC_REESTABLISH_TIMEOUT
;
1313 rpc_sleep_on(&xprt
->sending
, task
, NULL
, NULL
);
1318 xprt_disconnect(xprt
);
1320 xprt_release_write(xprt
, task
);
1323 dprintk("RPC: %4d xmit complete\n", task
->tk_pid
);
1324 /* Set the task's receive timeout value */
1325 spin_lock_bh(&xprt
->sock_lock
);
1326 if (!xprt
->nocong
) {
1327 int timer
= task
->tk_msg
.rpc_proc
->p_timer
;
1328 task
->tk_timeout
= rpc_calc_rto(clnt
->cl_rtt
, timer
);
1329 task
->tk_timeout
<<= rpc_ntimeo(clnt
->cl_rtt
, timer
) + req
->rq_retries
;
1330 if (task
->tk_timeout
> xprt
->timeout
.to_maxval
|| task
->tk_timeout
== 0)
1331 task
->tk_timeout
= xprt
->timeout
.to_maxval
;
1333 task
->tk_timeout
= req
->rq_timeout
;
1334 /* Don't race with disconnect */
1335 if (!xprt_connected(xprt
))
1336 task
->tk_status
= -ENOTCONN
;
1337 else if (!req
->rq_received
)
1338 rpc_sleep_on(&xprt
->pending
, task
, NULL
, xprt_timer
);
1339 __xprt_release_write(xprt
, task
);
1340 spin_unlock_bh(&xprt
->sock_lock
);
1344 * Reserve an RPC call slot.
1347 do_xprt_reserve(struct rpc_task
*task
)
1349 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1351 task
->tk_status
= 0;
1354 if (!list_empty(&xprt
->free
)) {
1355 struct rpc_rqst
*req
= list_entry(xprt
->free
.next
, struct rpc_rqst
, rq_list
);
1356 list_del_init(&req
->rq_list
);
1357 task
->tk_rqstp
= req
;
1358 xprt_request_init(task
, xprt
);
1361 dprintk("RPC: waiting for request slot\n");
1362 task
->tk_status
= -EAGAIN
;
1363 task
->tk_timeout
= 0;
1364 rpc_sleep_on(&xprt
->backlog
, task
, NULL
, NULL
);
1368 xprt_reserve(struct rpc_task
*task
)
1370 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1372 task
->tk_status
= -EIO
;
1373 if (!xprt
->shutdown
) {
1374 spin_lock(&xprt
->xprt_lock
);
1375 do_xprt_reserve(task
);
1376 spin_unlock(&xprt
->xprt_lock
);
1381 * Allocate a 'unique' XID
1383 static inline u32
xprt_alloc_xid(struct rpc_xprt
*xprt
)
1388 static inline void xprt_init_xid(struct rpc_xprt
*xprt
)
1390 get_random_bytes(&xprt
->xid
, sizeof(xprt
->xid
));
1394 * Initialize RPC request
1397 xprt_request_init(struct rpc_task
*task
, struct rpc_xprt
*xprt
)
1399 struct rpc_rqst
*req
= task
->tk_rqstp
;
1401 req
->rq_timeout
= xprt
->timeout
.to_initval
;
1402 req
->rq_task
= task
;
1403 req
->rq_xprt
= xprt
;
1404 req
->rq_xid
= xprt_alloc_xid(xprt
);
1405 dprintk("RPC: %4d reserved req %p xid %08x\n", task
->tk_pid
,
1406 req
, ntohl(req
->rq_xid
));
1410 * Release an RPC call slot
1413 xprt_release(struct rpc_task
*task
)
1415 struct rpc_xprt
*xprt
= task
->tk_xprt
;
1416 struct rpc_rqst
*req
;
1418 if (!(req
= task
->tk_rqstp
))
1420 spin_lock_bh(&xprt
->sock_lock
);
1421 __xprt_release_write(xprt
, task
);
1422 __xprt_put_cong(xprt
, req
);
1423 if (!list_empty(&req
->rq_list
))
1424 list_del(&req
->rq_list
);
1425 xprt
->last_used
= jiffies
;
1426 if (list_empty(&xprt
->recv
) && !xprt
->shutdown
)
1427 mod_timer(&xprt
->timer
, xprt
->last_used
+ XPRT_IDLE_TIMEOUT
);
1428 spin_unlock_bh(&xprt
->sock_lock
);
1429 task
->tk_rqstp
= NULL
;
1430 memset(req
, 0, sizeof(*req
)); /* mark unused */
1432 dprintk("RPC: %4d release request %p\n", task
->tk_pid
, req
);
1434 spin_lock(&xprt
->xprt_lock
);
1435 list_add(&req
->rq_list
, &xprt
->free
);
1436 xprt_clear_backlog(xprt
);
1437 spin_unlock(&xprt
->xprt_lock
);
1441 * Set default timeout parameters
1444 xprt_default_timeout(struct rpc_timeout
*to
, int proto
)
1446 if (proto
== IPPROTO_UDP
)
1447 xprt_set_timeout(to
, 5, 5 * HZ
);
1449 xprt_set_timeout(to
, 5, 60 * HZ
);
1453 * Set constant timeout
1456 xprt_set_timeout(struct rpc_timeout
*to
, unsigned int retr
, unsigned long incr
)
1459 to
->to_increment
= incr
;
1460 to
->to_maxval
= incr
* retr
;
1461 to
->to_retries
= retr
;
1462 to
->to_exponential
= 0;
1465 unsigned int xprt_udp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1466 unsigned int xprt_tcp_slot_table_entries
= RPC_DEF_SLOT_TABLE
;
1469 * Initialize an RPC client
1471 static struct rpc_xprt
*
1472 xprt_setup(int proto
, struct sockaddr_in
*ap
, struct rpc_timeout
*to
)
1474 struct rpc_xprt
*xprt
;
1475 unsigned int entries
;
1476 size_t slot_table_size
;
1477 struct rpc_rqst
*req
;
1479 dprintk("RPC: setting up %s transport...\n",
1480 proto
== IPPROTO_UDP
? "UDP" : "TCP");
1482 entries
= (proto
== IPPROTO_TCP
)?
1483 xprt_tcp_slot_table_entries
: xprt_udp_slot_table_entries
;
1485 if ((xprt
= kmalloc(sizeof(struct rpc_xprt
), GFP_KERNEL
)) == NULL
)
1486 return ERR_PTR(-ENOMEM
);
1487 memset(xprt
, 0, sizeof(*xprt
)); /* Nnnngh! */
1488 xprt
->max_reqs
= entries
;
1489 slot_table_size
= entries
* sizeof(xprt
->slot
[0]);
1490 xprt
->slot
= kmalloc(slot_table_size
, GFP_KERNEL
);
1491 if (xprt
->slot
== NULL
) {
1493 return ERR_PTR(-ENOMEM
);
1495 memset(xprt
->slot
, 0, slot_table_size
);
1499 xprt
->stream
= (proto
== IPPROTO_TCP
)? 1 : 0;
1501 xprt
->cwnd
= RPC_MAXCWND(xprt
);
1503 xprt
->max_payload
= (1U << 31) - 1;
1505 xprt
->cwnd
= RPC_INITCWND
;
1506 xprt
->max_payload
= (1U << 16) - (MAX_HEADER
<< 3);
1508 spin_lock_init(&xprt
->sock_lock
);
1509 spin_lock_init(&xprt
->xprt_lock
);
1510 init_waitqueue_head(&xprt
->cong_wait
);
1512 INIT_LIST_HEAD(&xprt
->free
);
1513 INIT_LIST_HEAD(&xprt
->recv
);
1514 INIT_WORK(&xprt
->sock_connect
, xprt_socket_connect
, xprt
);
1515 INIT_WORK(&xprt
->task_cleanup
, xprt_socket_autoclose
, xprt
);
1516 init_timer(&xprt
->timer
);
1517 xprt
->timer
.function
= xprt_init_autodisconnect
;
1518 xprt
->timer
.data
= (unsigned long) xprt
;
1519 xprt
->last_used
= jiffies
;
1520 xprt
->port
= XPRT_MAX_RESVPORT
;
1522 /* Set timeout parameters */
1524 xprt
->timeout
= *to
;
1526 xprt_default_timeout(&xprt
->timeout
, xprt
->prot
);
1528 rpc_init_wait_queue(&xprt
->pending
, "xprt_pending");
1529 rpc_init_wait_queue(&xprt
->sending
, "xprt_sending");
1530 rpc_init_wait_queue(&xprt
->resend
, "xprt_resend");
1531 rpc_init_priority_wait_queue(&xprt
->backlog
, "xprt_backlog");
1533 /* initialize free list */
1534 for (req
= &xprt
->slot
[entries
-1]; req
>= &xprt
->slot
[0]; req
--)
1535 list_add(&req
->rq_list
, &xprt
->free
);
1537 xprt_init_xid(xprt
);
1539 /* Check whether we want to use a reserved port */
1540 xprt
->resvport
= capable(CAP_NET_BIND_SERVICE
) ? 1 : 0;
1542 dprintk("RPC: created transport %p with %u slots\n", xprt
,
1549 * Bind to a reserved port
1551 static inline int xprt_bindresvport(struct rpc_xprt
*xprt
, struct socket
*sock
)
1553 struct sockaddr_in myaddr
= {
1554 .sin_family
= AF_INET
,
1558 /* Were we already bound to a given port? Try to reuse it */
1561 myaddr
.sin_port
= htons(port
);
1562 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &myaddr
,
1569 port
= XPRT_MAX_RESVPORT
;
1570 } while (err
== -EADDRINUSE
&& port
!= xprt
->port
);
1572 printk("RPC: Can't bind to reserved port (%d).\n", -err
);
1577 xprt_bind_socket(struct rpc_xprt
*xprt
, struct socket
*sock
)
1579 struct sock
*sk
= sock
->sk
;
1584 write_lock_bh(&sk
->sk_callback_lock
);
1585 sk
->sk_user_data
= xprt
;
1586 xprt
->old_data_ready
= sk
->sk_data_ready
;
1587 xprt
->old_state_change
= sk
->sk_state_change
;
1588 xprt
->old_write_space
= sk
->sk_write_space
;
1589 if (xprt
->prot
== IPPROTO_UDP
) {
1590 sk
->sk_data_ready
= udp_data_ready
;
1591 sk
->sk_no_check
= UDP_CSUM_NORCV
;
1592 xprt_set_connected(xprt
);
1594 tcp_sk(sk
)->nonagle
= 1; /* disable Nagle's algorithm */
1595 sk
->sk_data_ready
= tcp_data_ready
;
1596 sk
->sk_state_change
= tcp_state_change
;
1597 xprt_clear_connected(xprt
);
1599 sk
->sk_write_space
= xprt_write_space
;
1601 /* Reset to new socket */
1604 write_unlock_bh(&sk
->sk_callback_lock
);
1610 * Set socket buffer length
1613 xprt_sock_setbufsize(struct rpc_xprt
*xprt
)
1615 struct sock
*sk
= xprt
->inet
;
1619 if (xprt
->rcvsize
) {
1620 sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
1621 sk
->sk_rcvbuf
= xprt
->rcvsize
* xprt
->max_reqs
* 2;
1623 if (xprt
->sndsize
) {
1624 sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
1625 sk
->sk_sndbuf
= xprt
->sndsize
* xprt
->max_reqs
* 2;
1626 sk
->sk_write_space(sk
);
1631 * Datastream sockets are created here, but xprt_connect will create
1632 * and connect stream sockets.
1634 static struct socket
* xprt_create_socket(struct rpc_xprt
*xprt
, int proto
, int resvport
)
1636 struct socket
*sock
;
1639 dprintk("RPC: xprt_create_socket(%s %d)\n",
1640 (proto
== IPPROTO_UDP
)? "udp" : "tcp", proto
);
1642 type
= (proto
== IPPROTO_UDP
)? SOCK_DGRAM
: SOCK_STREAM
;
1644 if ((err
= sock_create_kern(PF_INET
, type
, proto
, &sock
)) < 0) {
1645 printk("RPC: can't create socket (%d).\n", -err
);
1649 /* If the caller has the capability, bind to a reserved port */
1650 if (resvport
&& xprt_bindresvport(xprt
, sock
) < 0) {
1651 printk("RPC: can't bind to reserved port.\n");
1663 * Create an RPC client transport given the protocol and peer address.
1666 xprt_create_proto(int proto
, struct sockaddr_in
*sap
, struct rpc_timeout
*to
)
1668 struct rpc_xprt
*xprt
;
1670 xprt
= xprt_setup(proto
, sap
, to
);
1672 dprintk("RPC: xprt_create_proto failed\n");
1674 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt
);
1679 * Prepare for transport shutdown.
1682 xprt_shutdown(struct rpc_xprt
*xprt
)
1685 rpc_wake_up(&xprt
->sending
);
1686 rpc_wake_up(&xprt
->resend
);
1687 rpc_wake_up(&xprt
->pending
);
1688 rpc_wake_up(&xprt
->backlog
);
1689 wake_up(&xprt
->cong_wait
);
1690 del_timer_sync(&xprt
->timer
);
1692 /* synchronously wait for connect worker to finish */
1693 cancel_delayed_work(&xprt
->sock_connect
);
1694 flush_scheduled_work();
1698 * Clear the xprt backlog queue
1701 xprt_clear_backlog(struct rpc_xprt
*xprt
) {
1702 rpc_wake_up_next(&xprt
->backlog
);
1703 wake_up(&xprt
->cong_wait
);
1708 * Destroy an RPC transport, killing off all requests.
1711 xprt_destroy(struct rpc_xprt
*xprt
)
1713 dprintk("RPC: destroying transport %p\n", xprt
);
1714 xprt_shutdown(xprt
);
1715 xprt_disconnect(xprt
);