Import 2.4.0-test2pre7
[davej-history.git] / net / sunrpc / xprt.c
blob7534288db33ee63f4e23adc0b54a33c8c86dcb92
1 /*
2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
41 * Rewrite of larges part of the code in order to stabilize TCP stuff.
42 * Fix behaviour when socket buffer is full.
43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
46 #define __KERNEL_SYSCALLS__
48 #include <linux/version.h>
49 #include <linux/types.h>
50 #include <linux/malloc.h>
51 #include <linux/capability.h>
52 #include <linux/sched.h>
53 #include <linux/errno.h>
54 #include <linux/socket.h>
55 #include <linux/in.h>
56 #include <linux/net.h>
57 #include <linux/mm.h>
58 #include <linux/udp.h>
59 #include <linux/unistd.h>
60 #include <linux/sunrpc/clnt.h>
61 #include <linux/file.h>
63 #include <net/sock.h>
64 #include <net/checksum.h>
65 #include <net/udp.h>
67 #include <asm/uaccess.h>
69 /* Following value should be > 32k + RPC overhead */
70 #define XPRT_MIN_WRITE_SPACE 35000
72 extern spinlock_t rpc_queue_lock;
75 * Local variables
78 /* Spinlock for critical sections in the code. */
79 spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;
80 spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;
82 #ifdef RPC_DEBUG
83 # undef RPC_DEBUG_DATA
84 # define RPCDBG_FACILITY RPCDBG_XPRT
85 #endif
87 #ifndef MAX
88 # define MAX(a, b) ((a) > (b)? (a) : (b))
89 # define MIN(a, b) ((a) < (b)? (a) : (b))
90 #endif
93 * Local functions
95 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
96 static void do_xprt_transmit(struct rpc_task *);
97 static void xprt_reserve_status(struct rpc_task *task);
98 static void xprt_disconnect(struct rpc_xprt *);
99 static void xprt_reconn_status(struct rpc_task *task);
100 static struct socket *xprt_create_socket(int, struct rpc_timeout *);
101 static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
102 static void xprt_remove_pending(struct rpc_xprt *);
104 #ifdef RPC_DEBUG_DATA
106 * Print the buffer contents (first 128 bytes only--just enough for
107 * diropres return).
109 static void
110 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
112 u8 *buf = (u8 *) packet;
113 int j;
115 dprintk("RPC: %s\n", msg);
116 for (j = 0; j < count && j < 128; j += 4) {
117 if (!(j & 31)) {
118 if (j)
119 dprintk("\n");
120 dprintk("0x%04x ", j);
122 dprintk("%02x%02x%02x%02x ",
123 buf[j], buf[j+1], buf[j+2], buf[j+3]);
125 dprintk("\n");
127 #else
128 static inline void
129 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
131 /* NOP */
133 #endif
136 * Look up RPC transport given an INET socket
138 static inline struct rpc_xprt *
139 xprt_from_sock(struct sock *sk)
141 return (struct rpc_xprt *) sk->user_data;
145 * Adjust the iovec to move on 'n' bytes
148 extern inline void
149 xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
151 struct iovec *iv=msg->msg_iov;
152 int i;
155 * Eat any sent iovecs
157 while (iv->iov_len <= amount) {
158 amount -= iv->iov_len;
159 iv++;
160 msg->msg_iovlen--;
164 * And chew down the partial one
166 niv[0].iov_len = iv->iov_len-amount;
167 niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
168 iv++;
171 * And copy any others
173 for(i = 1; i < msg->msg_iovlen; i++)
174 niv[i]=*iv++;
176 msg->msg_iov=niv;
180 * Write data to socket.
183 static inline int
184 xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
186 struct socket *sock = xprt->sock;
187 struct msghdr msg;
188 mm_segment_t oldfs;
189 int result;
190 int slen = req->rq_slen - req->rq_bytes_sent;
191 struct iovec niv[MAX_IOVEC];
193 if (slen <= 0)
194 return 0;
196 if (!sock)
197 return -ENOTCONN;
199 xprt_pktdump("packet data:",
200 req->rq_svec->iov_base,
201 req->rq_svec->iov_len);
203 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
204 msg.msg_iov = req->rq_svec;
205 msg.msg_iovlen = req->rq_snr;
206 msg.msg_name = (struct sockaddr *) &xprt->addr;
207 msg.msg_namelen = sizeof(xprt->addr);
208 msg.msg_control = NULL;
209 msg.msg_controllen = 0;
211 /* Dont repeat bytes */
212 if (req->rq_bytes_sent)
213 xprt_move_iov(&msg, niv, req->rq_bytes_sent);
215 oldfs = get_fs(); set_fs(get_ds());
216 result = sock_sendmsg(sock, &msg, slen);
217 set_fs(oldfs);
219 dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
221 if (result >= 0)
222 return result;
224 switch (result) {
225 case -ECONNREFUSED:
226 /* When the server has died, an ICMP port unreachable message
227 * prompts ECONNREFUSED.
229 break;
230 case -EAGAIN:
231 if (test_bit(SOCK_NOSPACE, &sock->flags))
232 result = -ENOMEM;
233 break;
234 case -ENOTCONN:
235 case -EPIPE:
236 /* connection broken */
237 if (xprt->stream)
238 result = -ENOTCONN;
239 break;
240 default:
241 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
242 result = 0;
244 return result;
248 * Read data from socket
250 static int
251 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift)
253 struct socket *sock = xprt->sock;
254 struct msghdr msg;
255 mm_segment_t oldfs;
256 struct iovec niv[MAX_IOVEC];
257 int result;
259 if (!sock)
260 return -ENOTCONN;
262 msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
263 msg.msg_iov = iov;
264 msg.msg_iovlen = nr;
265 msg.msg_name = NULL;
266 msg.msg_namelen = 0;
267 msg.msg_control = NULL;
268 msg.msg_controllen = 0;
270 /* Adjust the iovec if we've already filled it */
271 if (shift)
272 xprt_move_iov(&msg, niv, shift);
274 oldfs = get_fs(); set_fs(get_ds());
275 result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
276 set_fs(oldfs);
278 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
279 iov, len, result);
280 return result;
285 * Adjust RPC congestion window
286 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
288 static void
289 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
291 unsigned long cwnd = xprt->cwnd;
293 spin_lock_bh(&xprt_sock_lock);
294 if (xprt->nocong)
295 goto out;
296 if (result >= 0) {
297 if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
298 goto out;
299 /* The (cwnd >> 1) term makes sure
300 * the result gets rounded properly. */
301 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
302 if (cwnd > RPC_MAXCWND)
303 cwnd = RPC_MAXCWND;
304 else
305 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
306 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
307 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
308 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
309 (xprt->congtime-jiffies)*1000/HZ);
310 } else if (result == -ETIMEDOUT) {
311 if ((cwnd >>= 1) < RPC_CWNDSCALE)
312 cwnd = RPC_CWNDSCALE;
313 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
314 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
315 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
316 (xprt->congtime-jiffies)*1000/HZ);
317 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
320 xprt->cwnd = cwnd;
321 out:
322 spin_unlock_bh(&xprt_sock_lock);
326 * Adjust timeout values etc for next retransmit
329 xprt_adjust_timeout(struct rpc_timeout *to)
331 if (to->to_retries > 0) {
332 if (to->to_exponential)
333 to->to_current <<= 1;
334 else
335 to->to_current += to->to_increment;
336 if (to->to_maxval && to->to_current >= to->to_maxval)
337 to->to_current = to->to_maxval;
338 } else {
339 if (to->to_exponential)
340 to->to_initval <<= 1;
341 else
342 to->to_initval += to->to_increment;
343 if (to->to_maxval && to->to_initval >= to->to_maxval)
344 to->to_initval = to->to_maxval;
345 to->to_current = to->to_initval;
348 if (!to->to_current) {
349 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
350 to->to_current = 5 * HZ;
352 pprintk("RPC: %lu %s\n", jiffies,
353 to->to_retries? "retrans" : "timeout");
354 return to->to_retries-- > 0;
358 * Close down a transport socket
360 static void
361 xprt_close(struct rpc_xprt *xprt)
363 struct socket *sock = xprt->sock;
364 struct sock *sk = xprt->inet;
366 if (!sk)
367 return;
369 xprt->inet = NULL;
370 xprt->sock = NULL;
372 sk->user_data = NULL;
373 sk->data_ready = xprt->old_data_ready;
374 sk->state_change = xprt->old_state_change;
375 sk->write_space = xprt->old_write_space;
377 xprt_disconnect(xprt);
378 sk->no_check = 0;
380 sock_release(sock);
382 * TCP doesnt require the rpciod now - other things may
383 * but rpciod handles that not us.
385 if(xprt->stream)
386 rpciod_down();
390 * Mark a transport as disconnected
392 static void
393 xprt_disconnect(struct rpc_xprt *xprt)
395 dprintk("RPC: disconnected transport %p\n", xprt);
396 xprt->connected = 0;
397 xprt->tcp_offset = 0;
398 xprt->tcp_copied = 0;
399 xprt->tcp_more = 0;
400 xprt_remove_pending(xprt);
401 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
405 * Reconnect a broken TCP connection.
407 void
408 xprt_reconnect(struct rpc_task *task)
410 struct rpc_xprt *xprt = task->tk_xprt;
411 struct socket *sock = xprt->sock;
412 struct sock *inet = xprt->inet;
413 int status;
415 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
416 task->tk_pid, xprt, xprt->connected);
417 if (xprt->shutdown)
418 return;
420 if (!xprt->stream)
421 return;
423 if (!xprt->addr.sin_port) {
424 task->tk_status = -EIO;
425 return;
428 spin_lock(&xprt_lock);
429 if (xprt->connecting) {
430 task->tk_timeout = 0;
431 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
432 spin_unlock(&xprt_lock);
433 return;
435 xprt->connecting = 1;
436 spin_unlock(&xprt_lock);
438 status = -ENOTCONN;
439 if (!inet) {
440 /* Create an unconnected socket */
441 if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))
442 goto defer;
443 xprt_bind_socket(xprt, sock);
444 inet = sock->sk;
447 xprt_disconnect(xprt);
449 /* Now connect it asynchronously. */
450 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
451 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
452 sizeof(xprt->addr), O_NONBLOCK);
454 if (status < 0) {
455 switch (status) {
456 case -EALREADY:
457 case -EINPROGRESS:
458 status = 0;
459 break;
460 case -EISCONN:
461 case -EPIPE:
462 status = 0;
463 xprt_close(xprt);
464 goto defer;
465 default:
466 printk("RPC: TCP connect error %d!\n", -status);
467 xprt_close(xprt);
468 goto defer;
471 dprintk("RPC: %4d connect status %d connected %d\n",
472 task->tk_pid, status, xprt->connected);
474 spin_lock_bh(&xprt_sock_lock);
475 if (!xprt->connected) {
476 task->tk_timeout = xprt->timeout.to_maxval;
477 rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);
478 spin_unlock_bh(&xprt_sock_lock);
479 return;
481 spin_unlock_bh(&xprt_sock_lock);
483 defer:
484 spin_lock(&xprt_lock);
485 xprt->connecting = 0;
486 if (status < 0) {
487 rpc_delay(task, 5*HZ);
488 task->tk_status = -ENOTCONN;
490 rpc_wake_up(&xprt->reconn);
491 spin_unlock(&xprt_lock);
495 * Reconnect timeout. We just mark the transport as not being in the
496 * process of reconnecting, and leave the rest to the upper layers.
498 static void
499 xprt_reconn_status(struct rpc_task *task)
501 struct rpc_xprt *xprt = task->tk_xprt;
503 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
504 task->tk_pid, task->tk_status);
506 spin_lock(&xprt_lock);
507 xprt->connecting = 0;
508 rpc_wake_up(&xprt->reconn);
509 spin_unlock(&xprt_lock);
513 * Look up the RPC request corresponding to a reply, and then lock it.
515 static inline struct rpc_rqst *
516 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
518 struct rpc_task *head, *task;
519 struct rpc_rqst *req;
520 int safe = 0;
522 spin_lock_bh(&rpc_queue_lock);
523 if ((head = xprt->pending.task) != NULL) {
524 task = head;
525 do {
526 if ((req = task->tk_rqstp) && req->rq_xid == xid)
527 goto out;
528 task = task->tk_next;
529 if (++safe > 100) {
530 printk("xprt_lookup_rqst: loop in Q!\n");
531 goto out_bad;
533 } while (task != head);
535 dprintk("RPC: unknown XID %08x in reply.\n", xid);
536 out_bad:
537 req = NULL;
538 out:
539 if (req && !rpc_lock_task(req->rq_task))
540 req = NULL;
541 spin_unlock_bh(&rpc_queue_lock);
542 return req;
546 * Complete reply received.
547 * The TCP code relies on us to remove the request from xprt->pending.
549 static inline void
550 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
552 struct rpc_task *task = req->rq_task;
554 /* Adjust congestion window */
555 xprt_adjust_cwnd(xprt, copied);
557 #ifdef RPC_PROFILE
558 /* Profile only reads for now */
559 if (copied > 1024) {
560 static unsigned long nextstat = 0;
561 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
563 pkt_cnt++;
564 pkt_len += req->rq_slen + copied;
565 pkt_rtt += jiffies - req->rq_xtime;
566 if (time_before(nextstat, jiffies)) {
567 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
568 printk("RPC: %ld %ld %ld %ld stat\n",
569 jiffies, pkt_cnt, pkt_len, pkt_rtt);
570 pkt_rtt = pkt_len = pkt_cnt = 0;
571 nextstat = jiffies + 5 * HZ;
574 #endif
576 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
577 task->tk_status = copied;
579 /* ... and wake up the process. */
580 rpc_wake_up_task(task);
581 return;
585 * We have set things up such that we perform the checksum of the UDP
586 * packet in parallel with the copies into the RPC client iovec. -DaveM
588 static int csum_partial_copy_to_page_cache(struct iovec *iov,
589 struct sk_buff *skb,
590 int copied)
592 __u8 *pkt_data = skb->data + sizeof(struct udphdr);
593 __u8 *cur_ptr = iov->iov_base;
594 __kernel_size_t cur_len = iov->iov_len;
595 unsigned int csum = skb->csum;
596 int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
597 int slack = skb->len - copied - sizeof(struct udphdr);
599 if (need_csum)
600 csum = csum_partial(skb->h.raw, sizeof(struct udphdr), csum);
601 while (copied > 0) {
602 if (cur_len) {
603 int to_move = cur_len;
604 if (to_move > copied)
605 to_move = copied;
606 if (need_csum)
607 csum = csum_partial_copy_nocheck(pkt_data, cur_ptr,
608 to_move, csum);
609 else
610 memcpy(cur_ptr, pkt_data, to_move);
611 pkt_data += to_move;
612 copied -= to_move;
613 cur_ptr += to_move;
614 cur_len -= to_move;
616 if (cur_len <= 0) {
617 iov++;
618 cur_len = iov->iov_len;
619 cur_ptr = iov->iov_base;
622 if (need_csum) {
623 if (slack > 0)
624 csum = csum_partial(pkt_data, slack, csum);
625 if ((unsigned short)csum_fold(csum))
626 return -1;
628 return 0;
632 * Input handler for RPC replies. Called from a bottom half and hence
633 * atomic.
635 static inline void
636 udp_data_ready(struct sock *sk, int len)
638 struct rpc_task *task;
639 struct rpc_xprt *xprt;
640 struct rpc_rqst *rovr;
641 struct sk_buff *skb;
642 int err, repsize, copied;
644 dprintk("RPC: udp_data_ready...\n");
645 if (!(xprt = xprt_from_sock(sk))) {
646 printk("RPC: udp_data_ready request not found!\n");
647 return;
650 dprintk("RPC: udp_data_ready client %p\n", xprt);
652 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
653 return;
655 if (xprt->shutdown)
656 goto dropit;
658 repsize = skb->len - sizeof(struct udphdr);
659 if (repsize < 4) {
660 printk("RPC: impossible RPC reply size %d!\n", repsize);
661 goto dropit;
664 /* Look up and lock the request corresponding to the given XID */
665 rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
666 if (!rovr)
667 goto dropit;
668 task = rovr->rq_task;
670 dprintk("RPC: %4d received reply\n", task->tk_pid);
671 xprt_pktdump("packet data:",
672 (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);
674 if ((copied = rovr->rq_rlen) > repsize)
675 copied = repsize;
677 rovr->rq_damaged = 1;
678 /* Suck it into the iovec, verify checksum if not done by hw. */
679 if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
680 goto out_unlock;
682 /* Something worked... */
683 dst_confirm(skb->dst);
685 xprt_complete_rqst(xprt, rovr, copied);
687 out_unlock:
688 rpc_unlock_task(task);
690 dropit:
691 skb_free_datagram(sk, skb);
695 * TCP read fragment marker
697 static inline int
698 tcp_read_fraghdr(struct rpc_xprt *xprt)
700 struct iovec riov;
701 int want, result;
703 if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {
704 xprt->tcp_offset = 0;
705 xprt->tcp_reclen = 0;
707 if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))
708 goto done;
710 want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;
711 dprintk("RPC: reading header (%d bytes)\n", want);
712 do {
713 riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;
714 riov.iov_len = want;
715 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
716 if (result < 0)
717 return result;
718 xprt->tcp_offset += result;
719 want -= result;
720 } while (want);
722 /* Is this another fragment in the last message */
723 if (!xprt->tcp_more)
724 xprt->tcp_copied = 0; /* No, so we're reading a new message */
726 /* Get the record length and mask out the last fragment bit */
727 xprt->tcp_reclen = ntohl(xprt->tcp_recm);
728 xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;
729 xprt->tcp_reclen &= 0x7fffffff;
731 dprintk("RPC: New record reclen %d morefrags %d\n",
732 xprt->tcp_reclen, xprt->tcp_more);
733 done:
734 return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;
738 * TCP read xid
740 static inline int
741 tcp_read_xid(struct rpc_xprt *xprt, int avail)
743 struct iovec riov;
744 int want, result;
746 if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)
747 goto done;
748 want = MIN(sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);
749 do {
750 dprintk("RPC: reading xid (%d bytes)\n", want);
751 riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;
752 riov.iov_len = want;
753 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
754 if (result < 0)
755 return result;
756 xprt->tcp_copied += result;
757 xprt->tcp_offset += result;
758 want -= result;
759 avail -= result;
760 } while (want);
761 done:
762 return avail;
766 * TCP read and complete request
768 static inline int
769 tcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail)
771 int want, result;
773 if (req->rq_rlen <= xprt->tcp_copied || !avail)
774 goto done;
775 want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
776 do {
777 dprintk("RPC: %4d TCP receiving %d bytes\n",
778 req->rq_task->tk_pid, want);
780 result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);
781 if (result < 0)
782 return result;
783 xprt->tcp_copied += result;
784 xprt->tcp_offset += result;
785 avail -= result;
786 want -= result;
787 } while (want);
789 done:
790 if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)
791 return avail;
792 dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);
793 xprt_complete_rqst(xprt, req, xprt->tcp_copied);
795 return avail;
799 * TCP discard extra bytes from a short read
801 static inline int
802 tcp_read_discard(struct rpc_xprt *xprt, int avail)
804 struct iovec riov;
805 static u8 dummy[64];
806 int want, result = 0;
808 while (avail) {
809 want = MIN(avail, sizeof(dummy));
810 riov.iov_base = dummy;
811 riov.iov_len = want;
812 dprintk("RPC: TCP skipping %d bytes\n", want);
813 result = xprt_recvmsg(xprt, &riov, 1, want, 0);
814 if (result < 0)
815 return result;
816 xprt->tcp_offset += result;
817 avail -= result;
819 return avail;
823 * TCP record receive routine
824 * This is not the most efficient code since we call recvfrom thrice--
825 * first receiving the record marker, then the XID, then the data.
827 * The optimal solution would be a RPC support in the TCP layer, which
828 * would gather all data up to the next record marker and then pass us
829 * the list of all TCP segments ready to be copied.
831 static int
832 tcp_input_record(struct rpc_xprt *xprt)
834 struct rpc_rqst *req = NULL;
835 struct rpc_task *task = NULL;
836 int avail, result;
838 dprintk("RPC: tcp_input_record\n");
840 if (xprt->shutdown)
841 return -EIO;
842 if (!xprt->connected)
843 return -ENOTCONN;
845 /* Read in a new fragment marker if necessary */
846 /* Can we ever really expect to get completely empty fragments? */
847 if ((result = tcp_read_fraghdr(xprt)) <= 0)
848 return result;
849 avail = result;
851 /* Read in the xid if necessary */
852 if ((result = tcp_read_xid(xprt, avail)) <= 0)
853 return result;
854 avail = result;
856 /* Find and lock the request corresponding to this xid */
857 req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
858 if (req) {
859 task = req->rq_task;
860 if (xprt->tcp_copied == sizeof(xprt->tcp_xid) || req->rq_damaged) {
861 req->rq_damaged = 1;
862 /* Read in the request data */
863 result = tcp_read_request(xprt, req, avail);
865 rpc_unlock_task(task);
866 if (result < 0)
867 return result;
868 avail = result;
871 /* Skip over any trailing bytes on short reads */
872 if ((result = tcp_read_discard(xprt, avail)) < 0)
873 return result;
875 dprintk("RPC: tcp_input_record done (off %d reclen %d copied %d)\n",
876 xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);
877 result = xprt->tcp_reclen;
878 return result;
882 * TCP task queue stuff
884 LIST_HEAD(rpc_xprt_pending); /* List of xprts having pending tcp requests */
886 static inline
887 void tcp_rpciod_queue(void)
889 rpciod_wake_up();
892 static inline
893 void xprt_append_pending(struct rpc_xprt *xprt)
895 if (!list_empty(&xprt->rx_pending))
896 return;
897 spin_lock_bh(&rpc_queue_lock);
898 if (list_empty(&xprt->rx_pending)) {
899 list_add(&xprt->rx_pending, rpc_xprt_pending.prev);
900 dprintk("RPC: xprt queue %p\n", xprt);
901 tcp_rpciod_queue();
903 spin_unlock_bh(&rpc_queue_lock);
906 static
907 void xprt_remove_pending(struct rpc_xprt *xprt)
909 spin_lock_bh(&rpc_queue_lock);
910 if (!list_empty(&xprt->rx_pending)) {
911 list_del(&xprt->rx_pending);
912 INIT_LIST_HEAD(&xprt->rx_pending);
914 spin_unlock_bh(&rpc_queue_lock);
917 static inline
918 struct rpc_xprt *xprt_remove_pending_next(void)
920 struct rpc_xprt *xprt = NULL;
922 spin_lock_bh(&rpc_queue_lock);
923 if (!list_empty(&rpc_xprt_pending)) {
924 xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);
925 list_del(&xprt->rx_pending);
926 INIT_LIST_HEAD(&xprt->rx_pending);
928 spin_unlock_bh(&rpc_queue_lock);
929 return xprt;
933 * This is protected from tcp_data_ready and the stack as its run
934 * inside of the RPC I/O daemon
936 void
937 __rpciod_tcp_dispatcher(void)
939 struct rpc_xprt *xprt;
940 int safe_retry = 0, result;
942 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
945 * Empty each pending socket
947 while ((xprt = xprt_remove_pending_next()) != NULL) {
948 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
950 do {
951 result = tcp_input_record(xprt);
952 } while (result >= 0);
954 if (safe_retry++ > 200) {
955 schedule();
956 safe_retry = 0;
962 * data_ready callback for TCP. We can't just jump into the
963 * tcp recvmsg functions inside of the network receive bh or
964 * bad things occur. We queue it to pick up after networking
965 * is done.
968 static void tcp_data_ready(struct sock *sk, int len)
970 struct rpc_xprt *xprt;
972 dprintk("RPC: tcp_data_ready...\n");
973 if (!(xprt = xprt_from_sock(sk)))
975 printk("Not a socket with xprt %p\n", sk);
976 return;
979 if (xprt->shutdown)
980 return;
982 xprt_append_pending(xprt);
984 dprintk("RPC: tcp_data_ready client %p\n", xprt);
985 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
986 sk->state, xprt->connected,
987 sk->dead, sk->zapped);
991 static void
992 tcp_state_change(struct sock *sk)
994 struct rpc_xprt *xprt;
996 if (!(xprt = xprt_from_sock(sk)))
997 return;
998 dprintk("RPC: tcp_state_change client %p...\n", xprt);
999 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
1000 sk->state, xprt->connected,
1001 sk->dead, sk->zapped);
1003 spin_lock_bh(&xprt_sock_lock);
1004 switch (sk->state) {
1005 case TCP_ESTABLISHED:
1006 xprt->connected = 1;
1007 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1008 rpc_wake_up_task(xprt->snd_task);
1009 rpc_wake_up(&xprt->reconn);
1010 break;
1011 default:
1012 xprt->connected = 0;
1013 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
1014 break;
1016 spin_unlock_bh(&xprt_sock_lock);
1020 * The following 2 routines allow a task to sleep while socket memory is
1021 * low.
1023 static void
1024 tcp_write_space(struct sock *sk)
1026 struct rpc_xprt *xprt;
1028 if (!(xprt = xprt_from_sock(sk)))
1029 return;
1030 if (xprt->shutdown)
1031 return;
1033 /* Wait until we have enough socket memory */
1034 if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
1035 return;
1037 spin_lock_bh(&xprt_sock_lock);
1038 if (xprt->write_space)
1039 goto out_unlock;
1041 xprt->write_space = 1;
1043 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1044 rpc_wake_up_task(xprt->snd_task);
1045 out_unlock:
1046 spin_unlock_bh(&xprt_sock_lock);
1049 static void
1050 udp_write_space(struct sock *sk)
1052 struct rpc_xprt *xprt;
1054 if (!(xprt = xprt_from_sock(sk)))
1055 return;
1056 if (xprt->shutdown)
1057 return;
1060 /* Wait until we have enough socket memory */
1061 if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))
1062 return;
1064 spin_lock_bh(&xprt_sock_lock);
1065 if (xprt->write_space)
1066 goto out_unlock;
1068 xprt->write_space = 1;
1070 if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
1071 rpc_wake_up_task(xprt->snd_task);
1072 out_unlock:
1073 spin_unlock_bh(&xprt_sock_lock);
1077 * RPC receive timeout handler.
1079 static void
1080 xprt_timer(struct rpc_task *task)
1082 struct rpc_rqst *req = task->tk_rqstp;
1084 if (req)
1085 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
1087 dprintk("RPC: %4d xprt_timer (%s request)\n",
1088 task->tk_pid, req ? "pending" : "backlogged");
1090 task->tk_status = -ETIMEDOUT;
1091 task->tk_timeout = 0;
1092 rpc_wake_up_task(task);
1097 * Serialize access to sockets, in order to prevent different
1098 * requests from interfering with each other.
1100 static int
1101 xprt_down_transmit(struct rpc_task *task)
1103 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1104 struct rpc_rqst *req = task->tk_rqstp;
1106 spin_lock(&xprt_lock);
1107 if (xprt->snd_task && xprt->snd_task != task) {
1108 dprintk("RPC: %4d TCP write queue full (task %d)\n",
1109 task->tk_pid, xprt->snd_task->tk_pid);
1110 task->tk_timeout = 0;
1111 task->tk_status = -EAGAIN;
1112 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1113 } else if (!xprt->snd_task) {
1114 xprt->snd_task = task;
1115 #ifdef RPC_PROFILE
1116 req->rq_xtime = jiffies;
1117 #endif
1118 req->rq_bytes_sent = 0;
1120 spin_unlock(&xprt_lock);
1121 return xprt->snd_task == task;
1125 * Releases the socket for use by other requests.
1127 static inline void
1128 xprt_up_transmit(struct rpc_task *task)
1130 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1132 if (xprt->snd_task && xprt->snd_task == task) {
1133 spin_lock(&xprt_lock);
1134 xprt->snd_task = NULL;
1135 rpc_wake_up_next(&xprt->sending);
1136 spin_unlock(&xprt_lock);
1141 * Place the actual RPC call.
1142 * We have to copy the iovec because sendmsg fiddles with its contents.
1144 void
1145 xprt_transmit(struct rpc_task *task)
1147 struct rpc_rqst *req = task->tk_rqstp;
1148 struct rpc_xprt *xprt = req->rq_xprt;
1150 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1151 *(u32 *)(req->rq_svec[0].iov_base));
1153 if (xprt->shutdown)
1154 task->tk_status = -EIO;
1156 if (!xprt->connected)
1157 task->tk_status = -ENOTCONN;
1159 if (task->tk_status < 0)
1160 return;
1162 if (task->tk_rpcwait)
1163 rpc_remove_wait_queue(task);
1165 /* set up everything as needed. */
1166 /* Write the record marker */
1167 if (xprt->stream) {
1168 u32 *marker = req->rq_svec[0].iov_base;
1170 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
1173 if (!xprt_down_transmit(task))
1174 return;
1176 do_xprt_transmit(task);
1179 static void
1180 do_xprt_transmit(struct rpc_task *task)
1182 struct rpc_rqst *req = task->tk_rqstp;
1183 struct rpc_xprt *xprt = req->rq_xprt;
1184 int status, retry = 0;
1187 /* For fast networks/servers we have to put the request on
1188 * the pending list now:
1189 * Note that we don't want the task timing out during the
1190 * call to xprt_sendmsg(), so we initially disable the timeout,
1191 * and then reset it later...
1193 xprt_receive(task);
1195 /* Continue transmitting the packet/record. We must be careful
1196 * to cope with writespace callbacks arriving _after_ we have
1197 * called xprt_sendmsg().
1199 while (1) {
1200 xprt->write_space = 0;
1201 status = xprt_sendmsg(xprt, req);
1203 if (status < 0)
1204 break;
1206 if (xprt->stream) {
1207 req->rq_bytes_sent += status;
1209 if (req->rq_bytes_sent >= req->rq_slen)
1210 goto out_receive;
1211 } else {
1212 if (status >= req->rq_slen)
1213 goto out_receive;
1214 status = -ENOMEM;
1215 break;
1218 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1219 task->tk_pid, req->rq_slen - req->rq_bytes_sent,
1220 req->rq_slen);
1222 status = -EAGAIN;
1223 if (retry++ > 50)
1224 break;
1226 rpc_unlock_task(task);
1228 task->tk_status = status;
1230 /* Note: at this point, task->tk_sleeping has not yet been set,
1231 * hence there is no danger of the waking up task being put on
1232 * schedq, and being picked up by a parallel run of rpciod().
1234 rpc_wake_up_task(task);
1235 if (!RPC_IS_RUNNING(task))
1236 goto out_release;
1238 switch (status) {
1239 case -ENOMEM:
1240 /* Protect against (udp|tcp)_write_space */
1241 task->tk_timeout = req->rq_timeout.to_current;
1242 spin_lock_bh(&xprt_sock_lock);
1243 if (!xprt->write_space)
1244 rpc_sleep_on(&xprt->sending, task, NULL, NULL);
1245 spin_unlock_bh(&xprt_sock_lock);
1246 return;
1247 case -EAGAIN:
1248 /* Keep holding the socket if it is blocked */
1249 rpc_delay(task, HZ>>4);
1250 return;
1251 case -ECONNREFUSED:
1252 case -ENOTCONN:
1253 if (!xprt->stream)
1254 return;
1255 default:
1256 goto out_release;
1259 out_receive:
1260 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1261 /* Set the task's receive timeout value */
1262 task->tk_timeout = req->rq_timeout.to_current;
1263 rpc_add_timer(task, xprt_timer);
1264 rpc_unlock_task(task);
1265 out_release:
1266 xprt_up_transmit(task);
1270 * Queue the task for a reply to our call.
1271 * When the callback is invoked, the congestion window should have
1272 * been updated already.
1274 void
1275 xprt_receive(struct rpc_task *task)
1277 struct rpc_rqst *req = task->tk_rqstp;
1278 struct rpc_xprt *xprt = req->rq_xprt;
1280 dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1282 task->tk_timeout = 0;
1283 rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
1287 * Reserve an RPC call slot.
1290 xprt_reserve(struct rpc_task *task)
1292 struct rpc_xprt *xprt = task->tk_xprt;
1294 /* We already have an initialized request. */
1295 if (task->tk_rqstp)
1296 return 0;
1298 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1299 task->tk_pid, xprt->cong, xprt->cwnd);
1300 spin_lock_bh(&xprt_sock_lock);
1301 xprt_reserve_status(task);
1302 if (task->tk_rqstp) {
1303 task->tk_timeout = 0;
1304 } else if (!task->tk_timeout) {
1305 task->tk_status = -ENOBUFS;
1306 } else {
1307 dprintk("RPC: xprt_reserve waiting on backlog\n");
1308 task->tk_status = -EAGAIN;
1309 rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
1311 spin_unlock_bh(&xprt_sock_lock);
1312 dprintk("RPC: %4d xprt_reserve returns %d\n",
1313 task->tk_pid, task->tk_status);
1314 return task->tk_status;
1318 * Reservation callback
1320 static void
1321 xprt_reserve_status(struct rpc_task *task)
1323 struct rpc_xprt *xprt = task->tk_xprt;
1324 struct rpc_rqst *req;
1326 if (xprt->shutdown) {
1327 task->tk_status = -EIO;
1328 } else if (task->tk_status < 0) {
1329 /* NOP */
1330 } else if (task->tk_rqstp) {
1331 /* We've already been given a request slot: NOP */
1332 } else {
1333 if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
1334 goto out_nofree;
1335 /* OK: There's room for us. Grab a free slot and bump
1336 * congestion value */
1337 xprt->free = req->rq_next;
1338 req->rq_next = NULL;
1339 xprt->cong += RPC_CWNDSCALE;
1340 task->tk_rqstp = req;
1341 xprt_request_init(task, xprt);
1343 if (xprt->free)
1344 xprt_clear_backlog(xprt);
1347 return;
1349 out_nofree:
1350 task->tk_status = -EAGAIN;
1354 * Initialize RPC request
1356 static void
1357 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1359 struct rpc_rqst *req = task->tk_rqstp;
1360 static u32 xid = 0;
1362 if (!xid)
1363 xid = CURRENT_TIME << 12;
1365 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1366 task->tk_status = 0;
1367 req->rq_timeout = xprt->timeout;
1368 req->rq_task = task;
1369 req->rq_xprt = xprt;
1370 req->rq_xid = xid++;
1371 if (!xid)
1372 xid++;
1376 * Release an RPC call slot
1378 void
1379 xprt_release(struct rpc_task *task)
1381 struct rpc_xprt *xprt = task->tk_xprt;
1382 struct rpc_rqst *req;
1384 xprt_up_transmit(task);
1385 if (!(req = task->tk_rqstp))
1386 return;
1387 task->tk_rqstp = NULL;
1388 memset(req, 0, sizeof(*req)); /* mark unused */
1390 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1392 /* remove slot from queue of pending */
1393 if (task->tk_rpcwait) {
1394 printk("RPC: task of released request still queued!\n");
1395 rpc_remove_wait_queue(task);
1398 spin_lock_bh(&xprt_sock_lock);
1399 req->rq_next = xprt->free;
1400 xprt->free = req;
1402 /* Decrease congestion value. */
1403 xprt->cong -= RPC_CWNDSCALE;
1405 xprt_clear_backlog(xprt);
1406 spin_unlock_bh(&xprt_sock_lock);
1410 * Set default timeout parameters
1412 void
1413 xprt_default_timeout(struct rpc_timeout *to, int proto)
1415 if (proto == IPPROTO_UDP)
1416 xprt_set_timeout(to, 5, 5 * HZ);
1417 else
1418 xprt_set_timeout(to, 5, 60 * HZ);
1422 * Set constant timeout
1424 void
1425 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1427 to->to_current =
1428 to->to_initval =
1429 to->to_increment = incr;
1430 to->to_maxval = incr * retr;
1431 to->to_resrvval = incr * retr;
1432 to->to_retries = retr;
1433 to->to_exponential = 0;
1437 * Initialize an RPC client
1439 static struct rpc_xprt *
1440 xprt_setup(struct socket *sock, int proto,
1441 struct sockaddr_in *ap, struct rpc_timeout *to)
1443 struct rpc_xprt *xprt;
1444 struct rpc_rqst *req;
1445 int i;
1447 dprintk("RPC: setting up %s transport...\n",
1448 proto == IPPROTO_UDP? "UDP" : "TCP");
1450 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1451 return NULL;
1452 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1454 xprt->addr = *ap;
1455 xprt->prot = proto;
1456 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1457 if (xprt->stream) {
1458 xprt->cwnd = RPC_MAXCWND;
1459 xprt->nocong = 1;
1460 } else
1461 xprt->cwnd = RPC_INITCWND;
1462 xprt->congtime = jiffies;
1463 init_waitqueue_head(&xprt->cong_wait);
1465 /* Set timeout parameters */
1466 if (to) {
1467 xprt->timeout = *to;
1468 xprt->timeout.to_current = to->to_initval;
1469 xprt->timeout.to_resrvval = to->to_maxval << 1;
1470 } else
1471 xprt_default_timeout(&xprt->timeout, xprt->prot);
1473 xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1474 xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1475 xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1476 xprt->reconn = RPC_INIT_WAITQ("xprt_reconn");
1478 /* initialize free list */
1479 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1480 req->rq_next = req + 1;
1481 req->rq_next = NULL;
1482 xprt->free = xprt->slot;
1484 INIT_LIST_HEAD(&xprt->rx_pending);
1486 dprintk("RPC: created transport %p\n", xprt);
1488 xprt_bind_socket(xprt, sock);
1489 return xprt;
1493 * Bind to a reserved port
1495 static inline int
1496 xprt_bindresvport(struct socket *sock)
1498 struct sockaddr_in myaddr;
1499 int err, port;
1501 memset(&myaddr, 0, sizeof(myaddr));
1502 myaddr.sin_family = AF_INET;
1503 port = 800;
1504 do {
1505 myaddr.sin_port = htons(port);
1506 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1507 sizeof(myaddr));
1508 } while (err == -EADDRINUSE && --port > 0);
1510 if (err < 0)
1511 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1513 return err;
1516 static int
1517 xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
1519 struct sock *sk = sock->sk;
1521 if (xprt->inet)
1522 return -EBUSY;
1524 sk->user_data = xprt;
1525 xprt->old_data_ready = sk->data_ready;
1526 xprt->old_state_change = sk->state_change;
1527 xprt->old_write_space = sk->write_space;
1528 if (xprt->prot == IPPROTO_UDP) {
1529 sk->data_ready = udp_data_ready;
1530 sk->write_space = udp_write_space;
1531 sk->no_check = UDP_CSUM_NORCV;
1532 xprt->connected = 1;
1533 } else {
1534 sk->data_ready = tcp_data_ready;
1535 sk->state_change = tcp_state_change;
1536 sk->write_space = tcp_write_space;
1537 xprt->connected = 0;
1540 /* Reset to new socket */
1541 xprt->sock = sock;
1542 xprt->inet = sk;
1544 * TCP requires the rpc I/O daemon is present
1546 if(xprt->stream)
1547 rpciod_up();
1549 return 0;
1553 * Create a client socket given the protocol and peer address.
1555 static struct socket *
1556 xprt_create_socket(int proto, struct rpc_timeout *to)
1558 struct socket *sock;
1559 int type, err;
1561 dprintk("RPC: xprt_create_socket(%s %d)\n",
1562 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1564 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1566 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1567 printk("RPC: can't create socket (%d).\n", -err);
1568 goto failed;
1571 /* If the caller has the capability, bind to a reserved port */
1572 if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)
1573 goto failed;
1575 return sock;
1577 failed:
1578 sock_release(sock);
1579 return NULL;
1583 * Create an RPC client transport given the protocol and peer address.
1585 struct rpc_xprt *
1586 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1588 struct socket *sock;
1589 struct rpc_xprt *xprt;
1591 dprintk("RPC: xprt_create_proto called\n");
1593 if (!(sock = xprt_create_socket(proto, to)))
1594 return NULL;
1596 if (!(xprt = xprt_setup(sock, proto, sap, to)))
1597 sock_release(sock);
1599 return xprt;
1603 * Prepare for transport shutdown.
1605 void
1606 xprt_shutdown(struct rpc_xprt *xprt)
1608 xprt->shutdown = 1;
1609 rpc_wake_up(&xprt->sending);
1610 rpc_wake_up(&xprt->pending);
1611 rpc_wake_up(&xprt->backlog);
1612 rpc_wake_up(&xprt->reconn);
1613 wake_up(&xprt->cong_wait);
1617 * Clear the xprt backlog queue
1620 xprt_clear_backlog(struct rpc_xprt *xprt) {
1621 if (RPCXPRT_CONGESTED(xprt))
1622 return 0;
1623 rpc_wake_up_next(&xprt->backlog);
1624 wake_up(&xprt->cong_wait);
1625 return 1;
1629 * Destroy an RPC transport, killing off all requests.
1632 xprt_destroy(struct rpc_xprt *xprt)
1634 dprintk("RPC: destroying transport %p\n", xprt);
1635 xprt_shutdown(xprt);
1636 xprt_close(xprt);
1637 kfree(xprt);
1639 return 0;