Merge with Linux 2.5.48.
[linux-2.6/linux-mips.git] / net / sunrpc / xprt.c
blobf2630a93ef5045ebd57b0464247eb629203671e9
1 /*
2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #define __KERNEL_SYSCALLS__
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/slab.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/tcp.h>
60 #include <linux/unistd.h>
61 #include <linux/sunrpc/clnt.h>
62 #include <linux/file.h>
64 #include <net/sock.h>
65 #include <net/checksum.h>
66 #include <net/udp.h>
67 #include <net/tcp.h>
69 #include <asm/uaccess.h>
72 * Local variables
75 #ifdef RPC_DEBUG
76 # undef RPC_DEBUG_DATA
77 # define RPCDBG_FACILITY RPCDBG_XPRT
78 #endif
80 #define XPRT_MAX_BACKOFF (8)
83 * Local functions
85 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
86 static void do_xprt_transmit(struct rpc_task *);
87 static inline void do_xprt_reserve(struct rpc_task *);
88 static void xprt_disconnect(struct rpc_xprt *);
89 static void xprt_conn_status(struct rpc_task *task);
90 static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap,
91 struct rpc_timeout *to);
92 static struct socket *xprt_create_socket(int, struct rpc_timeout *, int);
93 static void xprt_bind_socket(struct rpc_xprt *, struct socket *);
94 static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
96 #ifdef RPC_DEBUG_DATA
98 * Print the buffer contents (first 128 bytes only--just enough for
99 * diropres return).
101 static void
102 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
104 u8 *buf = (u8 *) packet;
105 int j;
107 dprintk("RPC: %s\n", msg);
108 for (j = 0; j < count && j < 128; j += 4) {
109 if (!(j & 31)) {
110 if (j)
111 dprintk("\n");
112 dprintk("0x%04x ", j);
114 dprintk("%02x%02x%02x%02x ",
115 buf[j], buf[j+1], buf[j+2], buf[j+3]);
117 dprintk("\n");
119 #else
120 static inline void
121 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
123 /* NOP */
125 #endif
128 * Look up RPC transport given an INET socket
130 static inline struct rpc_xprt *
131 xprt_from_sock(struct sock *sk)
133 return (struct rpc_xprt *) sk->user_data;
137 * Serialize write access to sockets, in order to prevent different
138 * requests from interfering with each other.
139 * Also prevents TCP socket connects from colliding with writes.
141 static int
142 __xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
144 if (!xprt->snd_task) {
145 if (xprt->nocong || __xprt_get_cong(xprt, task))
146 xprt->snd_task = task;
148 if (xprt->snd_task != task) {
149 dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
150 task->tk_timeout = 0;
151 task->tk_status = -EAGAIN;
152 if (task->tk_rqstp && task->tk_rqstp->rq_nresend)
153 rpc_sleep_on(&xprt->resend, task, NULL, NULL);
154 else
155 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
157 return xprt->snd_task == task;
160 static inline int
161 xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
163 int retval;
165 spin_lock_bh(&xprt->sock_lock);
166 retval = __xprt_lock_write(xprt, task);
167 spin_unlock_bh(&xprt->sock_lock);
168 return retval;
172 static void
173 __xprt_lock_write_next(struct rpc_xprt *xprt)
175 struct rpc_task *task;
177 if (xprt->snd_task)
178 return;
179 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
180 return;
181 task = rpc_wake_up_next(&xprt->resend);
182 if (!task) {
183 task = rpc_wake_up_next(&xprt->sending);
184 if (!task)
185 return;
187 if (xprt->nocong || __xprt_get_cong(xprt, task))
188 xprt->snd_task = task;
192 * Releases the socket for use by other requests.
194 static void
195 __xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
197 if (xprt->snd_task == task)
198 xprt->snd_task = NULL;
199 __xprt_lock_write_next(xprt);
202 static inline void
203 xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
205 spin_lock_bh(&xprt->sock_lock);
206 __xprt_release_write(xprt, task);
207 spin_unlock_bh(&xprt->sock_lock);
211 * Write data to socket.
213 static inline int
214 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
216 struct socket *sock = xprt->sock;
217 struct msghdr msg;
218 struct xdr_buf *xdr = &req->rq_snd_buf;
219 struct iovec niv[MAX_IOVEC];
220 unsigned int niov, slen, skip;
221 mm_segment_t oldfs;
222 int result;
224 if (!sock)
225 return -ENOTCONN;
227 xprt_pktdump("packet data:",
228 req->rq_svec->iov_base,
229 req->rq_svec->iov_len);
231 /* Dont repeat bytes */
232 skip = req->rq_bytes_sent;
233 slen = xdr->len - skip;
234 niov = xdr_kmap(niv, xdr, skip);
236 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
237 msg.msg_iov = niv;
238 msg.msg_iovlen = niov;
239 msg.msg_name = (struct sockaddr *) &xprt->addr;
240 msg.msg_namelen = sizeof(xprt->addr);
241 msg.msg_control = NULL;
242 msg.msg_controllen = 0;
244 oldfs = get_fs(); set_fs(get_ds());
245 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
246 result = sock_sendmsg(sock, &msg, slen);
247 set_fs(oldfs);
249 xdr_kunmap(xdr, skip);
251 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
253 if (result >= 0)
254 return result;
256 switch (result) {
257 case -ECONNREFUSED:
258 /* When the server has died, an ICMP port unreachable message
259 * prompts ECONNREFUSED.
261 case -EAGAIN:
262 break;
263 case -ENOTCONN:
264 case -EPIPE:
265 /* connection broken */
266 if (xprt->stream)
267 result = -ENOTCONN;
268 break;
269 default:
270 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
272 return result;
276 * Van Jacobson congestion avoidance. Check if the congestion window
277 * overflowed. Put the task to sleep if this is the case.
279 static int
280 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
282 struct rpc_rqst *req = task->tk_rqstp;
284 if (req->rq_cong)
285 return 1;
286 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
287 task->tk_pid, xprt->cong, xprt->cwnd);
288 if (RPCXPRT_CONGESTED(xprt))
289 return 0;
290 req->rq_cong = 1;
291 xprt->cong += RPC_CWNDSCALE;
292 return 1;
296 * Adjust the congestion window, and wake up the next task
297 * that has been sleeping due to congestion
299 static void
300 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
302 if (!req->rq_cong)
303 return;
304 req->rq_cong = 0;
305 xprt->cong -= RPC_CWNDSCALE;
306 __xprt_lock_write_next(xprt);
310 * Adjust RPC congestion window
311 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
313 static void
314 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
316 unsigned long cwnd;
318 cwnd = xprt->cwnd;
319 if (result >= 0 && cwnd <= xprt->cong) {
320 /* The (cwnd >> 1) term makes sure
321 * the result gets rounded properly. */
322 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
323 if (cwnd > RPC_MAXCWND)
324 cwnd = RPC_MAXCWND;
325 __xprt_lock_write_next(xprt);
326 } else if (result == -ETIMEDOUT) {
327 cwnd >>= 1;
328 if (cwnd < RPC_CWNDSCALE)
329 cwnd = RPC_CWNDSCALE;
331 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
332 xprt->cong, xprt->cwnd, cwnd);
333 xprt->cwnd = cwnd;
337 * Adjust timeout values etc for next retransmit
340 xprt_adjust_timeout(struct rpc_timeout *to)
342 if (to->to_retries > 0) {
343 if (to->to_exponential)
344 to->to_current <<= 1;
345 else
346 to->to_current += to->to_increment;
347 if (to->to_maxval && to->to_current >= to->to_maxval)
348 to->to_current = to->to_maxval;
349 } else {
350 if (to->to_exponential)
351 to->to_initval <<= 1;
352 else
353 to->to_initval += to->to_increment;
354 if (to->to_maxval && to->to_initval >= to->to_maxval)
355 to->to_initval = to->to_maxval;
356 to->to_current = to->to_initval;
359 if (!to->to_current) {
360 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
361 to->to_current = 5 * HZ;
363 pprintk("RPC: %lu %s\n", jiffies,
364 to->to_retries? "retrans" : "timeout");
365 return to->to_retries-- > 0;
369 * Close down a transport socket
371 static void
372 xprt_close(struct rpc_xprt *xprt)
374 struct socket *sock = xprt->sock;
375 struct sock *sk = xprt->inet;
377 if (!sk)
378 return;
380 xprt->inet = NULL;
381 xprt->sock = NULL;
383 sk->user_data = NULL;
384 sk->data_ready = xprt->old_data_ready;
385 sk->state_change = xprt->old_state_change;
386 sk->write_space = xprt->old_write_space;
388 xprt_disconnect(xprt);
389 sk->no_check = 0;
391 sock_release(sock);
395 * Mark a transport as disconnected
397 static void
398 xprt_disconnect(struct rpc_xprt *xprt)
400 dprintk("RPC: disconnected transport %p\n", xprt);
401 xprt_clear_connected(xprt);
402 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
406 * Attempt to connect a TCP socket.
408 * NB: This never collides with TCP reads, as both run from rpciod
410 void
411 xprt_connect(struct rpc_task *task)
413 struct rpc_xprt *xprt = task->tk_xprt;
414 struct socket *sock = xprt->sock;
415 struct sock *inet;
416 int status;
418 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid,
419 xprt, (xprt_connected(xprt) ? "is" : "is not"));
421 if (xprt->shutdown) {
422 task->tk_status = -EIO;
423 return;
425 if (!xprt->addr.sin_port) {
426 task->tk_status = -EIO;
427 return;
429 if (!xprt_lock_write(xprt, task))
430 return;
431 if (xprt_connected(xprt))
432 goto out_write;
435 * We're here because the xprt was marked disconnected.
436 * Start by resetting any existing state.
438 xprt_close(xprt);
439 if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout, xprt->resvport))) {
440 /* couldn't create socket or bind to reserved port;
441 * this is likely a permanent error, so cause an abort */
442 task->tk_status = -EIO;
443 goto out_write;
445 xprt_bind_socket(xprt, sock);
446 inet = sock->sk;
449 * Tell the socket layer to start connecting...
451 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
452 sizeof(xprt->addr), O_NONBLOCK);
453 dprintk("RPC: %4d connect status %d connected %d sock state %d\n",
454 task->tk_pid, -status, xprt_connected(xprt), inet->state);
456 switch (status) {
457 case -EINPROGRESS:
458 case -EALREADY:
459 /* Protect against TCP socket state changes */
460 lock_sock(inet);
461 if (inet->state != TCP_ESTABLISHED) {
462 dprintk("RPC: %4d waiting for connection\n",
463 task->tk_pid);
464 task->tk_timeout = RPC_CONNECT_TIMEOUT;
465 /* if the socket is already closing, delay briefly */
466 if ((1 << inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
467 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
468 rpc_sleep_on(&xprt->pending, task, xprt_conn_status,
469 NULL);
470 release_sock(inet);
471 /* task status set when task wakes up again */
472 return;
474 release_sock(inet);
475 task->tk_status = 0;
476 break;
478 case 0:
479 case -EISCONN: /* not likely, but just in case */
480 /* Half closed state. No race -- this socket is dead. */
481 if (inet->state != TCP_ESTABLISHED) {
482 xprt_close(xprt);
483 task->tk_status = -EAGAIN;
484 goto out_write;
487 /* Otherwise, the connection is already established. */
488 task->tk_status = 0;
489 break;
491 case -EPIPE:
492 xprt_close(xprt);
493 task->tk_status = -ENOTCONN;
494 goto out_write;
496 default:
497 /* Report myriad other possible returns. If this file
498 * system is soft mounted, just error out, like Solaris. */
499 xprt_close(xprt);
500 if (task->tk_client->cl_softrtry) {
501 printk(KERN_WARNING
502 "RPC: error %d connecting to server %s, exiting\n",
503 -status, task->tk_client->cl_server);
504 task->tk_status = -EIO;
505 } else {
506 printk(KERN_WARNING
507 "RPC: error %d connecting to server %s\n",
508 -status, task->tk_client->cl_server);
509 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
510 task->tk_status = status;
512 break;
515 out_write:
516 xprt_release_write(xprt, task);
520 * We arrive here when awoken from waiting on connection establishment.
522 static void
523 xprt_conn_status(struct rpc_task *task)
525 struct rpc_xprt *xprt = task->tk_xprt;
527 switch (task->tk_status) {
528 case 0:
529 dprintk("RPC: %4d xprt_conn_status: connection established\n",
530 task->tk_pid);
531 goto out;
532 case -ETIMEDOUT:
533 dprintk("RPC: %4d xprt_conn_status: timed out\n",
534 task->tk_pid);
535 /* prevent TCP from continuing to retry SYNs */
536 xprt_close(xprt);
537 break;
538 default:
539 printk(KERN_ERR "RPC: error %d connecting to server %s\n",
540 -task->tk_status, task->tk_client->cl_server);
541 xprt_close(xprt);
542 rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
543 break;
545 /* if soft mounted, cause this RPC to fail */
546 if (task->tk_client->cl_softrtry)
547 task->tk_status = -EIO;
549 out:
550 xprt_release_write(xprt, task);
554 * Look up the RPC request corresponding to a reply, and then lock it.
556 static inline struct rpc_rqst *
557 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
559 struct list_head *pos;
560 struct rpc_rqst *req = NULL;
562 list_for_each(pos, &xprt->recv) {
563 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
564 if (entry->rq_xid == xid) {
565 req = entry;
566 break;
569 return req;
573 * Complete reply received.
574 * The TCP code relies on us to remove the request from xprt->pending.
576 static void
577 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
579 struct rpc_task *task = req->rq_task;
580 struct rpc_clnt *clnt = task->tk_client;
582 /* Adjust congestion window */
583 if (!xprt->nocong) {
584 xprt_adjust_cwnd(xprt, copied);
585 __xprt_put_cong(xprt, req);
586 if (!req->rq_nresend) {
587 unsigned timer =
588 task->tk_msg.rpc_proc->p_timer;
589 if (timer)
590 rpc_update_rtt(&clnt->cl_rtt, timer,
591 (long)jiffies - req->rq_xtime);
593 rpc_clear_timeo(&clnt->cl_rtt);
596 #ifdef RPC_PROFILE
597 /* Profile only reads for now */
598 if (copied > 1024) {
599 static unsigned long nextstat = 0;
600 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
602 pkt_cnt++;
603 pkt_len += req->rq_slen + copied;
604 pkt_rtt += jiffies - req->rq_xtime;
605 if (time_before(nextstat, jiffies)) {
606 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
607 printk("RPC: %ld %ld %ld %ld stat\n",
608 jiffies, pkt_cnt, pkt_len, pkt_rtt);
609 pkt_rtt = pkt_len = pkt_cnt = 0;
610 nextstat = jiffies + 5 * HZ;
613 #endif
615 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
616 req->rq_received = copied;
617 list_del_init(&req->rq_list);
619 /* ... and wake up the process. */
620 rpc_wake_up_task(task);
621 return;
624 static size_t
625 skb_read_bits(skb_reader_t *desc, void *to, size_t len)
627 if (len > desc->count)
628 len = desc->count;
629 skb_copy_bits(desc->skb, desc->offset, to, len);
630 desc->count -= len;
631 desc->offset += len;
632 return len;
635 static size_t
636 skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
638 unsigned int csum2, pos;
640 if (len > desc->count)
641 len = desc->count;
642 pos = desc->offset;
643 csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
644 desc->csum = csum_block_add(desc->csum, csum2, pos);
645 desc->count -= len;
646 desc->offset += len;
647 return len;
651 * We have set things up such that we perform the checksum of the UDP
652 * packet in parallel with the copies into the RPC client iovec. -DaveM
655 csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
657 skb_reader_t desc;
659 desc.skb = skb;
660 desc.offset = sizeof(struct udphdr);
661 desc.count = skb->len - desc.offset;
663 if (skb->ip_summed == CHECKSUM_UNNECESSARY)
664 goto no_checksum;
666 desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
667 xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
668 if (desc.offset != skb->len) {
669 unsigned int csum2;
670 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
671 desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
673 if ((unsigned short)csum_fold(desc.csum))
674 return -1;
675 return 0;
676 no_checksum:
677 xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
678 return 0;
682 * Input handler for RPC replies. Called from a bottom half and hence
683 * atomic.
685 static void
686 udp_data_ready(struct sock *sk, int len)
688 struct rpc_task *task;
689 struct rpc_xprt *xprt;
690 struct rpc_rqst *rovr;
691 struct sk_buff *skb;
692 int err, repsize, copied;
694 dprintk("RPC: udp_data_ready...\n");
695 if (!(xprt = xprt_from_sock(sk))) {
696 printk("RPC: udp_data_ready request not found!\n");
697 goto out;
700 dprintk("RPC: udp_data_ready client %p\n", xprt);
702 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
703 goto out;
705 if (xprt->shutdown)
706 goto dropit;
708 repsize = skb->len - sizeof(struct udphdr);
709 if (repsize < 4) {
710 printk("RPC: impossible RPC reply size %d!\n", repsize);
711 goto dropit;
714 /* Look up and lock the request corresponding to the given XID */
715 spin_lock(&xprt->sock_lock);
716 rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
717 if (!rovr)
718 goto out_unlock;
719 task = rovr->rq_task;
721 dprintk("RPC: %4d received reply\n", task->tk_pid);
722 xprt_pktdump("packet data:",
723 (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
725 if ((copied = rovr->rq_rlen) > repsize)
726 copied = repsize;
728 /* Suck it into the iovec, verify checksum if not done by hw. */
729 if (csum_partial_copy_to_xdr(&rovr->rq_rcv_buf, skb))
730 goto out_unlock;
732 /* Something worked... */
733 dst_confirm(skb->dst);
735 xprt_complete_rqst(xprt, rovr, copied);
737 out_unlock:
738 spin_unlock(&xprt->sock_lock);
739 dropit:
740 skb_free_datagram(sk, skb);
741 out:
742 if (sk->sleep && waitqueue_active(sk->sleep))
743 wake_up_interruptible(sk->sleep);
747 * Copy from an skb into memory and shrink the skb.
749 static inline size_t
750 tcp_copy_data(skb_reader_t *desc, void *p, size_t len)
752 if (len > desc->count)
753 len = desc->count;
754 skb_copy_bits(desc->skb, desc->offset, p, len);
755 desc->offset += len;
756 desc->count -= len;
757 return len;
761 * TCP read fragment marker
763 static inline void
764 tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc)
766 size_t len, used;
767 char *p;
769 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;
770 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
771 used = tcp_copy_data(desc, p, len);
772 xprt->tcp_offset += used;
773 if (used != len)
774 return;
775 xprt->tcp_reclen = ntohl(xprt->tcp_recm);
776 if (xprt->tcp_reclen & 0x80000000)
777 xprt->tcp_flags |= XPRT_LAST_FRAG;
778 else
779 xprt->tcp_flags &= ~XPRT_LAST_FRAG;
780 xprt->tcp_reclen &= 0x7fffffff;
781 xprt->tcp_flags &= ~XPRT_COPY_RECM;
782 xprt->tcp_offset = 0;
783 /* Sanity check of the record length */
784 if (xprt->tcp_reclen < 4) {
785 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");
786 xprt_disconnect(xprt);
788 dprintk("RPC: reading TCP record fragment of length %d\n",
789 xprt->tcp_reclen);
792 static void
793 tcp_check_recm(struct rpc_xprt *xprt)
795 if (xprt->tcp_offset == xprt->tcp_reclen) {
796 xprt->tcp_flags |= XPRT_COPY_RECM;
797 xprt->tcp_offset = 0;
798 if (xprt->tcp_flags & XPRT_LAST_FRAG) {
799 xprt->tcp_flags &= ~XPRT_COPY_DATA;
800 xprt->tcp_flags |= XPRT_COPY_XID;
801 xprt->tcp_copied = 0;
807 * TCP read xid
809 static inline void
810 tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
812 size_t len, used;
813 char *p;
815 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;
816 dprintk("RPC: reading XID (%Zu bytes)\n", len);
817 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;
818 used = tcp_copy_data(desc, p, len);
819 xprt->tcp_offset += used;
820 if (used != len)
821 return;
822 xprt->tcp_flags &= ~XPRT_COPY_XID;
823 xprt->tcp_flags |= XPRT_COPY_DATA;
824 xprt->tcp_copied = 4;
825 dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
826 tcp_check_recm(xprt);
830 * TCP read and complete request
832 static inline void
833 tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
835 struct rpc_rqst *req;
836 struct xdr_buf *rcvbuf;
837 size_t len;
839 /* Find and lock the request corresponding to this xid */
840 spin_lock(&xprt->sock_lock);
841 req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
842 if (!req) {
843 xprt->tcp_flags &= ~XPRT_COPY_DATA;
844 dprintk("RPC: XID %08x request not found!\n",
845 xprt->tcp_xid);
846 spin_unlock(&xprt->sock_lock);
847 return;
850 rcvbuf = &req->rq_rcv_buf;
851 len = desc->count;
852 if (len > xprt->tcp_reclen - xprt->tcp_offset) {
853 skb_reader_t my_desc;
855 len = xprt->tcp_reclen - xprt->tcp_offset;
856 memcpy(&my_desc, desc, sizeof(my_desc));
857 my_desc.count = len;
858 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
859 &my_desc, tcp_copy_data);
860 desc->count -= len;
861 desc->offset += len;
862 } else
863 xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
864 desc, tcp_copy_data);
865 xprt->tcp_copied += len;
866 xprt->tcp_offset += len;
868 if (xprt->tcp_copied == req->rq_rlen)
869 xprt->tcp_flags &= ~XPRT_COPY_DATA;
870 else if (xprt->tcp_offset == xprt->tcp_reclen) {
871 if (xprt->tcp_flags & XPRT_LAST_FRAG)
872 xprt->tcp_flags &= ~XPRT_COPY_DATA;
875 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
876 dprintk("RPC: %4d received reply complete\n",
877 req->rq_task->tk_pid);
878 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
880 spin_unlock(&xprt->sock_lock);
881 tcp_check_recm(xprt);
885 * TCP discard extra bytes from a short read
887 static inline void
888 tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc)
890 size_t len;
892 len = xprt->tcp_reclen - xprt->tcp_offset;
893 if (len > desc->count)
894 len = desc->count;
895 desc->count -= len;
896 desc->offset += len;
897 xprt->tcp_offset += len;
898 tcp_check_recm(xprt);
902 * TCP record receive routine
903 * We first have to grab the record marker, then the XID, then the data.
905 static int
906 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
907 unsigned int offset, size_t len)
909 struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
910 skb_reader_t desc = {
911 .skb = skb,
912 .offset = offset,
913 .count = len,
914 .csum = 0
917 dprintk("RPC: tcp_data_recv\n");
918 do {
919 /* Read in a new fragment marker if necessary */
920 /* Can we ever really expect to get completely empty fragments? */
921 if (xprt->tcp_flags & XPRT_COPY_RECM) {
922 tcp_read_fraghdr(xprt, &desc);
923 continue;
925 /* Read in the xid if necessary */
926 if (xprt->tcp_flags & XPRT_COPY_XID) {
927 tcp_read_xid(xprt, &desc);
928 continue;
930 /* Read in the request data */
931 if (xprt->tcp_flags & XPRT_COPY_DATA) {
932 tcp_read_request(xprt, &desc);
933 continue;
935 /* Skip over any trailing bytes on short reads */
936 tcp_read_discard(xprt, &desc);
937 } while (desc.count && xprt_connected(xprt));
938 dprintk("RPC: tcp_data_recv done\n");
939 return len - desc.count;
942 static void tcp_data_ready(struct sock *sk, int bytes)
944 struct rpc_xprt *xprt;
945 read_descriptor_t rd_desc;
947 dprintk("RPC: tcp_data_ready...\n");
948 if (!(xprt = xprt_from_sock(sk))) {
949 printk("RPC: tcp_data_ready socket info not found!\n");
950 return;
952 if (xprt->shutdown)
953 return;
955 /* We use rd_desc to pass struct xprt to tcp_data_recv */
956 rd_desc.buf = (char *)xprt;
957 rd_desc.count = 65536;
958 tcp_read_sock(sk, &rd_desc, tcp_data_recv);
961 static void
962 tcp_state_change(struct sock *sk)
964 struct rpc_xprt *xprt;
966 if (!(xprt = xprt_from_sock(sk)))
967 goto out;
968 dprintk("RPC: tcp_state_change client %p...\n", xprt);
969 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
970 sk->state, xprt_connected(xprt),
971 sk->dead, sk->zapped);
973 switch (sk->state) {
974 case TCP_ESTABLISHED:
975 if (xprt_test_and_set_connected(xprt))
976 break;
978 /* Reset TCP record info */
979 xprt->tcp_offset = 0;
980 xprt->tcp_reclen = 0;
981 xprt->tcp_copied = 0;
982 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
984 spin_lock(&xprt->sock_lock);
985 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
986 rpc_wake_up_task(xprt->snd_task);
987 spin_unlock(&xprt->sock_lock);
988 break;
989 case TCP_SYN_SENT:
990 case TCP_SYN_RECV:
991 break;
992 default:
993 xprt_disconnect(xprt);
994 break;
996 out:
997 if (sk->sleep && waitqueue_active(sk->sleep))
998 wake_up_interruptible_all(sk->sleep);
1002 * Called when more output buffer space is available for this socket.
1003 * We try not to wake our writers until they can make "significant"
1004 * progress, otherwise we'll waste resources thrashing sock_sendmsg
1005 * with a bunch of small requests.
1007 static void
1008 xprt_write_space(struct sock *sk)
1010 struct rpc_xprt *xprt;
1011 struct socket *sock;
1013 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))
1014 return;
1015 if (xprt->shutdown)
1016 return;
1018 /* Wait until we have enough socket memory */
1019 if (xprt->stream) {
1020 /* from net/ipv4/tcp.c:tcp_write_space */
1021 if (tcp_wspace(sk) < tcp_min_write_space(sk))
1022 return;
1023 } else {
1024 /* from net/core/sock.c:sock_def_write_space */
1025 if (!sock_writeable(sk))
1026 return;
1029 if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
1030 return;
1032 spin_lock_bh(&xprt->sock_lock);
1033 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
1034 rpc_wake_up_task(xprt->snd_task);
1035 spin_unlock_bh(&xprt->sock_lock);
1036 if (sk->sleep && waitqueue_active(sk->sleep))
1037 wake_up_interruptible(sk->sleep);
1041 * Exponential backoff for UDP retries
1043 static inline int
1044 xprt_expbackoff(struct rpc_task *task, struct rpc_rqst *req)
1046 int backoff;
1048 req->rq_ntimeo++;
1049 backoff = min(rpc_ntimeo(&task->tk_client->cl_rtt), XPRT_MAX_BACKOFF);
1050 if (req->rq_ntimeo < (1 << backoff))
1051 return 1;
1052 return 0;
1056 * RPC receive timeout handler.
1058 static void
1059 xprt_timer(struct rpc_task *task)
1061 struct rpc_rqst *req = task->tk_rqstp;
1062 struct rpc_xprt *xprt = req->rq_xprt;
1064 spin_lock(&xprt->sock_lock);
1065 if (req->rq_received)
1066 goto out;
1068 if (!xprt->nocong) {
1069 if (xprt_expbackoff(task, req)) {
1070 rpc_add_timer(task, xprt_timer);
1071 goto out_unlock;
1073 rpc_inc_timeo(&task->tk_client->cl_rtt);
1074 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
1075 __xprt_put_cong(xprt, req);
1077 req->rq_nresend++;
1079 dprintk("RPC: %4d xprt_timer (%s request)\n",
1080 task->tk_pid, req ? "pending" : "backlogged");
1082 task->tk_status = -ETIMEDOUT;
1083 out:
1084 task->tk_timeout = 0;
1085 rpc_wake_up_task(task);
1086 out_unlock:
1087 spin_unlock(&xprt->sock_lock);
1091 * Place the actual RPC call.
1092 * We have to copy the iovec because sendmsg fiddles with its contents.
1094 void
1095 xprt_transmit(struct rpc_task *task)
1097 struct rpc_rqst *req = task->tk_rqstp;
1098 struct rpc_xprt *xprt = req->rq_xprt;
1100 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1101 *(u32 *)(req->rq_svec[0].iov_base));
1103 if (xprt->shutdown)
1104 task->tk_status = -EIO;
1106 if (!xprt_connected(xprt))
1107 task->tk_status = -ENOTCONN;
1109 if (task->tk_status < 0)
1110 return;
1112 if (task->tk_rpcwait)
1113 rpc_remove_wait_queue(task);
1115 /* set up everything as needed. */
1116 /* Write the record marker */
1117 if (xprt->stream) {
1118 u32 *marker = req->rq_svec[0].iov_base;
1120 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1123 spin_lock_bh(&xprt->sock_lock);
1124 if (!__xprt_lock_write(xprt, task)) {
1125 spin_unlock_bh(&xprt->sock_lock);
1126 return;
1128 if (list_empty(&req->rq_list)) {
1129 list_add_tail(&req->rq_list, &xprt->recv);
1130 req->rq_received = 0;
1132 spin_unlock_bh(&xprt->sock_lock);
1134 do_xprt_transmit(task);
1137 static void
1138 do_xprt_transmit(struct rpc_task *task)
1140 struct rpc_clnt *clnt = task->tk_client;
1141 struct rpc_rqst *req = task->tk_rqstp;
1142 struct rpc_xprt *xprt = req->rq_xprt;
1143 int status, retry = 0;
1146 /* Continue transmitting the packet/record. We must be careful
1147 * to cope with writespace callbacks arriving _after_ we have
1148 * called xprt_sendmsg().
1150 while (1) {
1151 req->rq_xtime = jiffies;
1152 status = xprt_sendmsg(xprt, req);
1154 if (status < 0)
1155 break;
1157 if (xprt->stream) {
1158 req->rq_bytes_sent += status;
1160 if (req->rq_bytes_sent >= req->rq_slen)
1161 goto out_receive;
1162 } else {
1163 if (status >= req->rq_slen)
1164 goto out_receive;
1165 status = -EAGAIN;
1166 break;
1169 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1170 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1171 req->rq_slen);
1173 status = -EAGAIN;
1174 if (retry++ > 50)
1175 break;
1178 /* Note: at this point, task->tk_sleeping has not yet been set,
1179 * hence there is no danger of the waking up task being put on
1180 * schedq, and being picked up by a parallel run of rpciod().
1182 if (req->rq_received)
1183 goto out_release;
1185 task->tk_status = status;
1187 switch (status) {
1188 case -EAGAIN:
1189 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
1190 /* Protect against races with xprt_write_space */
1191 spin_lock_bh(&xprt->sock_lock);
1192 if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
1193 task->tk_timeout = req->rq_timeout.to_current;
1194 rpc_sleep_on(&xprt->pending, task, NULL, NULL);
1196 spin_unlock_bh(&xprt->sock_lock);
1197 return;
1199 /* Keep holding the socket if it is blocked */
1200 rpc_delay(task, HZ>>4);
1201 return;
1202 case -ECONNREFUSED:
1203 case -ENOTCONN:
1204 if (!xprt->stream) {
1205 task->tk_timeout = RPC_REESTABLISH_TIMEOUT;
1206 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1207 return;
1209 /* fall through */
1210 default:
1211 if (xprt->stream)
1212 xprt_disconnect(xprt);
1213 req->rq_bytes_sent = 0;
1215 out_release:
1216 spin_lock_bh(&xprt->sock_lock);
1217 __xprt_release_write(xprt, task);
1218 __xprt_put_cong(xprt, req);
1219 spin_unlock_bh(&xprt->sock_lock);
1220 return;
1221 out_receive:
1222 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1223 /* Set the task's receive timeout value */
1224 if (!xprt->nocong) {
1225 task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
1226 task->tk_msg.rpc_proc->p_timer);
1227 req->rq_ntimeo = 0;
1228 if (task->tk_timeout > req->rq_timeout.to_maxval)
1229 task->tk_timeout = req->rq_timeout.to_maxval;
1230 } else
1231 task->tk_timeout = req->rq_timeout.to_current;
1232 spin_lock_bh(&xprt->sock_lock);
1233 if (!req->rq_received)
1234 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
1235 __xprt_release_write(xprt, task);
1236 spin_unlock_bh(&xprt->sock_lock);
1240 * Reserve an RPC call slot.
1242 void
1243 xprt_reserve(struct rpc_task *task)
1245 struct rpc_xprt *xprt = task->tk_xprt;
1247 task->tk_status = -EIO;
1248 if (!xprt->shutdown) {
1249 spin_lock(&xprt->xprt_lock);
1250 do_xprt_reserve(task);
1251 spin_unlock(&xprt->xprt_lock);
1255 static inline void
1256 do_xprt_reserve(struct rpc_task *task)
1258 struct rpc_xprt *xprt = task->tk_xprt;
1260 task->tk_status = 0;
1261 if (task->tk_rqstp)
1262 return;
1263 if (xprt->free) {
1264 struct rpc_rqst *req = xprt->free;
1265 xprt->free = req->rq_next;
1266 req->rq_next = NULL;
1267 task->tk_rqstp = req;
1268 xprt_request_init(task, xprt);
1269 return;
1271 dprintk("RPC: waiting for request slot\n");
1272 task->tk_status = -EAGAIN;
1273 task->tk_timeout = 0;
1274 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1278 * Initialize RPC request
1280 static void
1281 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1283 struct rpc_rqst *req = task->tk_rqstp;
1284 static u32 xid = 0;
1286 if (!xid)
1287 xid = get_seconds() << 12;
1289 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1290 req->rq_timeout = xprt->timeout;
1291 req->rq_task = task;
1292 req->rq_xprt = xprt;
1293 req->rq_xid = xid++;
1294 if (!xid)
1295 xid++;
1296 INIT_LIST_HEAD(&req->rq_list);
1300 * Release an RPC call slot
1302 void
1303 xprt_release(struct rpc_task *task)
1305 struct rpc_xprt *xprt = task->tk_xprt;
1306 struct rpc_rqst *req;
1308 if (!(req = task->tk_rqstp))
1309 return;
1310 spin_lock_bh(&xprt->sock_lock);
1311 __xprt_release_write(xprt, task);
1312 __xprt_put_cong(xprt, req);
1313 if (!list_empty(&req->rq_list))
1314 list_del(&req->rq_list);
1315 spin_unlock_bh(&xprt->sock_lock);
1316 task->tk_rqstp = NULL;
1317 memset(req, 0, sizeof(*req)); /* mark unused */
1319 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1321 spin_lock(&xprt->xprt_lock);
1322 req->rq_next = xprt->free;
1323 xprt->free = req;
1325 xprt_clear_backlog(xprt);
1326 spin_unlock(&xprt->xprt_lock);
1330 * Set default timeout parameters
1332 void
1333 xprt_default_timeout(struct rpc_timeout *to, int proto)
1335 if (proto == IPPROTO_UDP)
1336 xprt_set_timeout(to, 5, 5 * HZ);
1337 else
1338 xprt_set_timeout(to, 5, 60 * HZ);
1342 * Set constant timeout
1344 void
1345 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1347 to->to_current =
1348 to->to_initval =
1349 to->to_increment = incr;
1350 to->to_maxval = incr * retr;
1351 to->to_retries = retr;
1352 to->to_exponential = 0;
1356 * Initialize an RPC client
1358 static struct rpc_xprt *
1359 xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
1361 struct rpc_xprt *xprt;
1362 struct rpc_rqst *req;
1363 int i;
1365 dprintk("RPC: setting up %s transport...\n",
1366 proto == IPPROTO_UDP? "UDP" : "TCP");
1368 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1369 return NULL;
1370 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1372 xprt->addr = *ap;
1373 xprt->prot = proto;
1374 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1375 if (xprt->stream) {
1376 xprt->cwnd = RPC_MAXCWND;
1377 xprt->nocong = 1;
1378 } else
1379 xprt->cwnd = RPC_INITCWND;
1380 spin_lock_init(&xprt->sock_lock);
1381 spin_lock_init(&xprt->xprt_lock);
1382 init_waitqueue_head(&xprt->cong_wait);
1384 INIT_LIST_HEAD(&xprt->recv);
1386 /* Set timeout parameters */
1387 if (to) {
1388 xprt->timeout = *to;
1389 xprt->timeout.to_current = to->to_initval;
1390 } else
1391 xprt_default_timeout(&xprt->timeout, xprt->prot);
1393 INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
1394 INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
1395 INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
1396 INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
1398 /* initialize free list */
1399 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1400 req->rq_next = req + 1;
1401 req->rq_next = NULL;
1402 xprt->free = xprt->slot;
1404 dprintk("RPC: created transport %p\n", xprt);
1406 return xprt;
1410 * Bind to a reserved port
1412 static inline int
1413 xprt_bindresvport(struct socket *sock)
1415 struct sockaddr_in myaddr;
1416 int err, port;
1418 memset(&myaddr, 0, sizeof(myaddr));
1419 myaddr.sin_family = AF_INET;
1420 port = 800;
1421 do {
1422 myaddr.sin_port = htons(port);
1423 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1424 sizeof(myaddr));
1425 } while (err == -EADDRINUSE && --port > 0);
1427 if (err < 0)
1428 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1430 return err;
1433 static void
1434 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1436 struct sock *sk = sock->sk;
1438 if (xprt->inet)
1439 return;
1441 sk->user_data = xprt;
1442 xprt->old_data_ready = sk->data_ready;
1443 xprt->old_state_change = sk->state_change;
1444 xprt->old_write_space = sk->write_space;
1445 if (xprt->prot == IPPROTO_UDP) {
1446 sk->data_ready = udp_data_ready;
1447 sk->no_check = UDP_CSUM_NORCV;
1448 xprt_set_connected(xprt);
1449 } else {
1450 struct tcp_opt *tp = tcp_sk(sk);
1451 tp->nonagle = 1; /* disable Nagle's algorithm */
1452 sk->data_ready = tcp_data_ready;
1453 sk->state_change = tcp_state_change;
1454 xprt_clear_connected(xprt);
1456 sk->write_space = xprt_write_space;
1458 /* Reset to new socket */
1459 xprt->sock = sock;
1460 xprt->inet = sk;
1462 return;
1466 * Set socket buffer length
1468 void
1469 xprt_sock_setbufsize(struct rpc_xprt *xprt)
1471 struct sock *sk = xprt->inet;
1473 if (xprt->stream)
1474 return;
1475 if (xprt->rcvsize) {
1476 sk->userlocks |= SOCK_RCVBUF_LOCK;
1477 sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
1479 if (xprt->sndsize) {
1480 sk->userlocks |= SOCK_SNDBUF_LOCK;
1481 sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
1482 sk->write_space(sk);
1487 * Datastream sockets are created here, but xprt_connect will create
1488 * and connect stream sockets.
1490 static struct socket *
1491 xprt_create_socket(int proto, struct rpc_timeout *to, int resvport)
1493 struct socket *sock;
1494 int type, err;
1496 dprintk("RPC: xprt_create_socket(%s %d)\n",
1497 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1499 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1501 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1502 printk("RPC: can't create socket (%d).\n", -err);
1503 goto failed;
1506 /* If the caller has the capability, bind to a reserved port */
1507 if (resvport && xprt_bindresvport(sock) < 0) {
1508 printk("RPC: can't bind to reserved port.\n");
1509 goto failed;
1512 return sock;
1514 failed:
1515 sock_release(sock);
1516 return NULL;
1520 * Create an RPC client transport given the protocol and peer address.
1522 struct rpc_xprt *
1523 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1525 struct rpc_xprt *xprt;
1527 xprt = xprt_setup(proto, sap, to);
1528 if (!xprt)
1529 goto out_bad;
1531 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
1532 if (!xprt->stream) {
1533 struct socket *sock;
1535 sock = xprt_create_socket(proto, to, xprt->resvport);
1536 if (!sock)
1537 goto out_bad;
1538 xprt_bind_socket(xprt, sock);
1541 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt);
1542 return xprt;
1543 out_bad:
1544 dprintk("RPC: xprt_create_proto failed\n");
1545 if (xprt)
1546 kfree(xprt);
1547 return NULL;
1551 * Prepare for transport shutdown.
1553 void
1554 xprt_shutdown(struct rpc_xprt *xprt)
1556 xprt->shutdown = 1;
1557 rpc_wake_up(&xprt->sending);
1558 rpc_wake_up(&xprt->resend);
1559 rpc_wake_up(&xprt->pending);
1560 rpc_wake_up(&xprt->backlog);
1561 if (waitqueue_active(&xprt->cong_wait))
1562 wake_up(&xprt->cong_wait);
1566 * Clear the xprt backlog queue
1569 xprt_clear_backlog(struct rpc_xprt *xprt) {
1570 rpc_wake_up_next(&xprt->backlog);
1571 if (waitqueue_active(&xprt->cong_wait))
1572 wake_up(&xprt->cong_wait);
1573 return 1;
1577 * Destroy an RPC transport, killing off all requests.
1580 xprt_destroy(struct rpc_xprt *xprt)
1582 dprintk("RPC: destroying transport %p\n", xprt);
1583 xprt_shutdown(xprt);
1584 xprt_close(xprt);
1585 kfree(xprt);
1587 return 0;