Import 2.3.10pre1
[davej-history.git] / net / sunrpc / xprt.c
blobd0de24eff8ad54dca3cda8aeeb2d3e7aef7af89b
1 /*
2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
38 * TCP NFS related read + write fixes
39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
42 #define __KERNEL_SYSCALLS__
44 #include <linux/version.h>
45 #include <linux/config.h>
46 #include <linux/types.h>
47 #include <linux/malloc.h>
48 #include <linux/sched.h>
49 #include <linux/errno.h>
50 #include <linux/socket.h>
51 #include <linux/in.h>
52 #include <linux/net.h>
53 #include <linux/mm.h>
54 #include <linux/udp.h>
55 #include <linux/unistd.h>
56 #include <linux/sunrpc/clnt.h>
57 #include <linux/file.h>
59 #include <net/sock.h>
60 #include <net/checksum.h>
61 #include <net/udp.h>
63 #include <asm/uaccess.h>
65 #define SOCK_HAS_USER_DATA
68 * Local variables
70 #ifndef SOCK_HAS_USER_DATA
71 static struct rpc_xprt * sock_list = NULL;
72 #endif
74 #ifdef RPC_DEBUG
75 # undef RPC_DEBUG_DATA
76 # define RPCDBG_FACILITY RPCDBG_XPRT
77 #endif
79 #ifndef MAX
80 # define MAX(a, b) ((a) > (b)? (a) : (b))
81 # define MIN(a, b) ((a) < (b)? (a) : (b))
82 #endif
85 * Local functions
87 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
88 static void xprt_transmit_status(struct rpc_task *task);
89 static void xprt_receive_status(struct rpc_task *task);
90 static void xprt_reserve_status(struct rpc_task *task);
91 static void xprt_reconn_timeout(struct rpc_task *task);
92 static void xprt_reconn_status(struct rpc_task *task);
93 static struct socket *xprt_create_socket(int, struct sockaddr_in *,
94 struct rpc_timeout *);
96 #ifdef RPC_DEBUG_DATA
98 * Print the buffer contents (first 128 bytes only--just enough for
99 * diropres return).
101 static void
102 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
104 u8 *buf = (u8 *) packet;
105 int j;
107 dprintk("RPC: %s\n", msg);
108 for (j = 0; j < count && j < 128; j += 4) {
109 if (!(j & 31)) {
110 if (j)
111 dprintk("\n");
112 dprintk("0x%04x ", j);
114 dprintk("%02x%02x%02x%02x ",
115 buf[j], buf[j+1], buf[j+2], buf[j+3]);
117 dprintk("\n");
119 #else
120 static inline void
121 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
123 /* NOP */
125 #endif
128 * Look up RPC transport given an INET socket
130 static inline struct rpc_xprt *
131 xprt_from_sock(struct sock *sk)
133 #ifndef SOCK_HAS_USER_DATA
134 struct rpc_xprt *xprt;
136 for (xprt = sock_list; xprt && sk != xprt->inet; xprt = xprt->link)
138 return xprt;
139 #else
140 return (struct rpc_xprt *) sk->user_data;
141 #endif
145 * Adjust the iovec to move on 'n' bytes
148 extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
150 struct iovec *iv=msg->msg_iov;
153 * Eat any sent iovecs
156 while(iv->iov_len < amount)
158 amount-=iv->iov_len;
159 iv++;
160 msg->msg_iovlen--;
163 msg->msg_iov=niv;
166 * And chew down the partial one
169 niv[0].iov_len = iv->iov_len-amount;
170 niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
171 iv++;
174 * And copy any others
177 for(amount=1;amount<msg->msg_iovlen; amount++)
179 niv[amount]=*iv++;
184 * Write data to socket.
187 static inline int
188 xprt_sendmsg(struct rpc_xprt *xprt)
190 struct socket *sock = xprt->sock;
191 struct msghdr msg;
192 mm_segment_t oldfs;
193 int result;
194 struct iovec niv[MAX_IOVEC];
196 xprt_pktdump("packet data:",
197 xprt->snd_buf.io_vec->iov_base,
198 xprt->snd_buf.io_vec->iov_len);
200 msg.msg_flags = MSG_DONTWAIT;
201 msg.msg_iov = xprt->snd_buf.io_vec;
202 msg.msg_iovlen = xprt->snd_buf.io_nr;
203 msg.msg_name = (struct sockaddr *) &xprt->addr;
204 msg.msg_namelen = sizeof(xprt->addr);
205 msg.msg_control = NULL;
206 msg.msg_controllen = 0;
208 /* Dont repeat bytes */
210 if(xprt->snd_sent)
211 xprt_move_iov(&msg, niv, xprt->snd_sent);
213 oldfs = get_fs(); set_fs(get_ds());
214 result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
215 set_fs(oldfs);
217 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
218 xprt->snd_buf.io_len, result);
220 if (result >= 0) {
221 xprt->snd_buf.io_len -= result;
222 xprt->snd_sent += result;
223 return result;
226 switch (result) {
227 case -ECONNREFUSED:
228 /* When the server has died, an ICMP port unreachable message
229 * prompts ECONNREFUSED.
231 break;
232 case -EAGAIN:
233 return 0;
234 case -ENOTCONN: case -EPIPE:
235 /* connection broken */
236 break;
237 default:
238 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
239 result = 0;
241 return result;
245 * Read data from socket
247 static inline int
248 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
250 struct socket *sock = xprt->sock;
251 struct sockaddr_in sin;
252 struct msghdr msg;
253 mm_segment_t oldfs;
254 int result;
256 #if LINUX_VERSION_CODE >= 0x020100
257 msg.msg_flags = MSG_DONTWAIT;
258 msg.msg_iov = iov;
259 msg.msg_iovlen = nr;
260 msg.msg_name = &sin;
261 msg.msg_namelen = sizeof(sin);
262 msg.msg_control = NULL;
263 msg.msg_controllen = 0;
265 oldfs = get_fs(); set_fs(get_ds());
266 result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
267 set_fs(oldfs);
268 #else
269 int alen = sizeof(sin);
270 msg.msg_flags = 0;
271 msg.msg_iov = iov;
272 msg.msg_iovlen = nr;
273 msg.msg_name = &sin;
274 msg.msg_namelen = sizeof(sin);
275 msg.msg_control = NULL;
276 msg.msg_controllen = 0;
278 oldfs = get_fs(); set_fs(get_ds());
279 result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
280 set_fs(oldfs);
281 #endif
283 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
284 iov, len, result);
285 return result;
290 * Adjust RPC congestion window
291 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
293 static void
294 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
296 unsigned long cwnd = xprt->cwnd;
298 if (xprt->nocong)
299 return;
300 if (result >= 0) {
301 if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
302 return;
303 /* The (cwnd >> 1) term makes sure
304 * the result gets rounded properly. */
305 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
306 if (cwnd > RPC_MAXCWND)
307 cwnd = RPC_MAXCWND;
308 else
309 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
310 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
311 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
312 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
313 (xprt->congtime-jiffies)*1000/HZ);
314 } else if (result == -ETIMEDOUT) {
315 if ((cwnd >>= 1) < RPC_CWNDSCALE)
316 cwnd = RPC_CWNDSCALE;
317 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
318 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
319 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
320 (xprt->congtime-jiffies)*1000/HZ);
321 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
324 xprt->cwnd = cwnd;
328 * Adjust timeout values etc for next retransmit
331 xprt_adjust_timeout(struct rpc_timeout *to)
333 if (to->to_exponential)
334 to->to_current <<= 1;
335 else
336 to->to_current += to->to_increment;
337 if (to->to_maxval && to->to_current >= to->to_maxval) {
338 to->to_current = to->to_maxval;
339 to->to_retries = 0;
341 if (!to->to_current) {
342 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
343 to->to_current = 5 * HZ;
345 pprintk("RPC: %lu %s\n", jiffies,
346 to->to_retries? "retrans" : "timeout");
347 return (to->to_retries)--;
351 * Close down a transport socket
353 static void
354 xprt_close(struct rpc_xprt *xprt)
356 struct sock *sk = xprt->inet;
358 #ifdef SOCK_HAS_USER_DATA
359 sk->user_data = NULL;
360 #endif
361 sk->data_ready = xprt->old_data_ready;
362 sk->no_check = 0;
363 sk->state_change = xprt->old_state_change;
364 sk->write_space = xprt->old_write_space;
366 sock_release(xprt->sock);
368 * TCP doesnt require the rpciod now - other things may
369 * but rpciod handles that not us.
371 if(xprt->stream && !xprt->connecting)
372 rpciod_down();
376 * Mark a transport as disconnected
378 static void
379 xprt_disconnect(struct rpc_xprt *xprt)
381 dprintk("RPC: disconnected transport %p\n", xprt);
382 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
383 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
384 xprt->connected = 0;
388 * Reconnect a broken TCP connection.
390 void
391 xprt_reconnect(struct rpc_task *task)
393 struct rpc_xprt *xprt = task->tk_xprt;
394 struct socket *sock;
395 struct sock *inet;
396 int status;
398 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
399 task->tk_pid, xprt, xprt->connected);
400 task->tk_status = 0;
402 if (xprt->connecting) {
403 task->tk_timeout = xprt->timeout.to_maxval;
404 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
405 return;
407 xprt->connecting = 1;
409 /* Create an unconnected socket */
410 if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
411 goto defer;
413 #if LINUX_VERSION_CODE >= 0x020100
414 inet = sock->sk;
415 #else
416 inet = (struct sock *) sock->data;
417 #endif
418 inet->data_ready = xprt->inet->data_ready;
419 inet->state_change = xprt->inet->state_change;
420 inet->write_space = xprt->inet->write_space;
421 #ifdef SOCK_HAS_USER_DATA
422 inet->user_data = xprt;
423 #endif
425 dprintk("RPC: %4d closing old socket\n", task->tk_pid);
426 xprt_disconnect(xprt);
427 xprt_close(xprt);
429 /* Reset to new socket and default congestion */
430 xprt->sock = sock;
431 xprt->inet = inet;
432 xprt->cwnd = RPC_INITCWND;
434 /* Now connect it asynchronously. */
435 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
436 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
437 sizeof(xprt->addr), O_NONBLOCK);
438 if (status < 0) {
439 if (status != -EINPROGRESS && status != -EALREADY) {
440 printk("RPC: TCP connect error %d!\n", -status);
441 goto defer;
444 dprintk("RPC: %4d connect status %d connected %d\n",
445 task->tk_pid, status, xprt->connected);
446 task->tk_timeout = 60 * HZ;
448 start_bh_atomic();
449 if (!xprt->connected) {
450 rpc_sleep_on(&xprt->reconn, task,
451 xprt_reconn_status, xprt_reconn_timeout);
452 end_bh_atomic();
453 return;
455 end_bh_atomic();
458 xprt->connecting = 0;
459 rpc_wake_up(&xprt->reconn);
460 return;
462 defer:
463 task->tk_timeout = 30 * HZ;
464 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
465 xprt->connecting = 0;
469 * Reconnect status
471 static void
472 xprt_reconn_status(struct rpc_task *task)
474 struct rpc_xprt *xprt = task->tk_xprt;
476 dprintk("RPC: %4d xprt_reconn_status %d\n",
477 task->tk_pid, task->tk_status);
478 if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
479 task->tk_timeout = 30 * HZ;
480 rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
485 * Reconnect timeout. We just mark the transport as not being in the
486 * process of reconnecting, and leave the rest to the upper layers.
488 static void
489 xprt_reconn_timeout(struct rpc_task *task)
491 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
492 task->tk_pid, task->tk_status);
493 task->tk_status = -ENOTCONN;
494 task->tk_xprt->connecting = 0;
495 task->tk_timeout = 0;
496 rpc_wake_up_task(task);
500 * Look up the RPC request corresponding to a reply.
502 static inline struct rpc_rqst *
503 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
505 struct rpc_task *head, *task;
506 struct rpc_rqst *req;
507 int safe = 0;
509 if ((head = xprt->pending.task) != NULL) {
510 task = head;
511 do {
512 if ((req = task->tk_rqstp) && req->rq_xid == xid)
513 return req;
514 task = task->tk_next;
515 if (++safe > 100) {
516 printk("xprt_lookup_rqst: loop in Q!\n");
517 return NULL;
519 } while (task != head);
521 dprintk("RPC: unknown XID %08x in reply.\n", xid);
522 return NULL;
526 * Complete reply received.
527 * The TCP code relies on us to remove the request from xprt->pending.
529 static inline void
530 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
532 struct rpc_task *task = req->rq_task;
534 req->rq_rlen = copied;
535 req->rq_gotit = 1;
537 /* Adjust congestion window */
538 xprt_adjust_cwnd(xprt, copied);
540 #ifdef RPC_PROFILE
541 /* Profile only reads for now */
542 if (copied > 1024) {
543 static unsigned long nextstat = 0;
544 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
546 pkt_cnt++;
547 pkt_len += req->rq_slen + copied;
548 pkt_rtt += jiffies - req->rq_xtime;
549 if (time_before(nextstat, jiffies)) {
550 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
551 printk("RPC: %ld %ld %ld %ld stat\n",
552 jiffies, pkt_cnt, pkt_len, pkt_rtt);
553 pkt_rtt = pkt_len = pkt_cnt = 0;
554 nextstat = jiffies + 5 * HZ;
557 #endif
559 /* ... and wake up the process. */
560 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
561 task->tk_status = copied;
563 rpc_wake_up_task(task);
564 return;
567 /* We have set things up such that we perform the checksum of the UDP
568 * packet in parallel with the copies into the RPC client iovec. -DaveM
570 static int csum_partial_copy_to_page_cache(struct iovec *iov,
571 struct sk_buff *skb,
572 int copied)
574 __u8 *pkt_data = skb->data + sizeof(struct udphdr);
575 __u8 *cur_ptr = iov->iov_base;
576 __kernel_size_t cur_len = iov->iov_len;
577 unsigned int csum = skb->csum;
578 int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
579 int slack = skb->len - copied - sizeof(struct udphdr);
581 if (need_csum)
582 csum = csum_partial(skb->h.raw, sizeof(struct udphdr), csum);
583 while (copied > 0) {
584 if (cur_len) {
585 int to_move = cur_len;
586 if (to_move > copied)
587 to_move = copied;
588 if (need_csum)
589 csum = csum_partial_copy_nocheck(pkt_data, cur_ptr,
590 to_move, csum);
591 else
592 memcpy(cur_ptr, pkt_data, to_move);
593 pkt_data += to_move;
594 copied -= to_move;
595 cur_ptr += to_move;
596 cur_len -= to_move;
598 if (cur_len <= 0) {
599 iov++;
600 cur_len = iov->iov_len;
601 cur_ptr = iov->iov_base;
604 if (need_csum) {
605 if (slack > 0)
606 csum = csum_partial(pkt_data, slack, csum);
607 if ((unsigned short)csum_fold(csum))
608 return -1;
610 return 0;
613 /* Input handler for RPC replies. Called from a bottom half and hence
614 * atomic.
616 static inline void
617 udp_data_ready(struct sock *sk, int len)
619 struct rpc_xprt *xprt;
620 struct rpc_rqst *rovr;
621 struct sk_buff *skb;
622 int err, repsize, copied;
624 dprintk("RPC: udp_data_ready...\n");
625 if (!(xprt = xprt_from_sock(sk)))
626 return;
627 dprintk("RPC: udp_data_ready client %p\n", xprt);
629 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
630 return;
632 repsize = skb->len - sizeof(struct udphdr);
633 if (repsize < 4) {
634 printk("RPC: impossible RPC reply size %d!\n", repsize);
635 goto dropit;
638 /* Look up the request corresponding to the given XID */
639 if (!(rovr = xprt_lookup_rqst(xprt,
640 *(u32 *) (skb->h.raw + sizeof(struct udphdr)))))
641 goto dropit;
643 dprintk("RPC: %4d received reply\n", rovr->rq_task->tk_pid);
644 xprt_pktdump("packet data:",
645 (u32 *) (skb->h.raw + sizeof(struct udphdr)), repsize);
647 if ((copied = rovr->rq_rlen) > repsize)
648 copied = repsize;
650 /* Suck it into the iovec, verify checksum if not done by hw. */
651 if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
652 goto dropit;
654 /* Something worked... */
655 dst_confirm(skb->dst);
657 xprt_complete_rqst(xprt, rovr, copied);
659 dropit:
660 skb_free_datagram(sk, skb);
661 return;
665 * TCP record receive routine
666 * This is not the most efficient code since we call recvfrom twice--
667 * first receiving the record marker and XID, then the data.
669 * The optimal solution would be a RPC support in the TCP layer, which
670 * would gather all data up to the next record marker and then pass us
671 * the list of all TCP segments ready to be copied.
673 static inline int
674 tcp_input_record(struct rpc_xprt *xprt)
676 struct rpc_rqst *req;
677 struct iovec *iov;
678 struct iovec riov;
679 u32 offset;
680 int result, maxcpy, reclen, avail, want;
682 dprintk("RPC: tcp_input_record\n");
683 offset = xprt->tcp_offset;
684 result = -EAGAIN;
685 if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
686 want = (xprt->tcp_more? 4 : 8) - offset;
687 dprintk("RPC: reading header (%d bytes)\n", want);
688 riov.iov_base = xprt->tcp_recm.data + offset;
689 riov.iov_len = want;
690 result = xprt_recvmsg(xprt, &riov, 1, want);
691 if (!result)
693 dprintk("RPC: empty TCP record.\n");
694 return -ENOTCONN;
696 if (result < 0)
697 goto done;
698 offset += result;
699 if (result < want) {
700 result = -EAGAIN;
701 goto done;
704 /* Get the record length and mask out the more_fragments bit */
705 reclen = ntohl(xprt->tcp_reclen);
706 dprintk("RPC: reclen %08x\n", reclen);
707 xprt->tcp_more = (reclen & 0x80000000)? 0 : 1;
708 reclen &= 0x7fffffff;
709 xprt->tcp_total += reclen;
710 xprt->tcp_reclen = reclen;
712 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
713 xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more);
714 if (!xprt->tcp_copied
715 && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) {
716 iov = xprt->tcp_iovec;
717 memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
718 #if 0
719 *(u32 *)iov->iov_base = req->rq_xid;
720 #endif
721 iov->iov_base += 4;
722 iov->iov_len -= 4;
723 xprt->tcp_copied = 4;
724 xprt->tcp_rqstp = req;
726 } else {
727 reclen = xprt->tcp_reclen;
730 avail = reclen - (offset - 4);
731 if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid
732 && req->rq_task->tk_rpcwait == &xprt->pending) {
733 want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
735 dprintk("RPC: %4d TCP receiving %d bytes\n",
736 req->rq_task->tk_pid, want);
737 result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
738 if (!result && want)
739 result = -EAGAIN;
740 if (result < 0)
741 goto done;
742 xprt->tcp_copied += result;
743 offset += result;
744 avail -= result;
745 if (result < want) {
746 result = -EAGAIN;
747 goto done;
750 maxcpy = MIN(req->rq_rlen, xprt->tcp_total);
751 if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) {
752 dprintk("RPC: %4d received reply complete\n",
753 req->rq_task->tk_pid);
754 xprt_complete_rqst(xprt, req, xprt->tcp_total);
755 xprt->tcp_copied = 0;
756 xprt->tcp_rqstp = NULL;
758 /* Request must be re-encoded before retransmit */
759 req->rq_damaged = 1;
762 /* Skip over any trailing bytes on short reads */
763 while (avail) {
764 static u8 dummy[64];
766 want = MIN(avail, sizeof(dummy));
767 riov.iov_base = dummy;
768 riov.iov_len = want;
769 dprintk("RPC: TCP skipping %d bytes\n", want);
770 result = xprt_recvmsg(xprt, &riov, 1, want);
771 if (!result && want)
772 result=-EAGAIN;
773 if (result < 0)
774 goto done;
775 offset += result;
776 avail -= result;
777 if (result < want) {
778 result = -EAGAIN;
779 goto done;
782 if (!xprt->tcp_more)
783 xprt->tcp_total = 0;
784 offset = 0;
786 done:
787 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
788 offset, xprt->tcp_total, xprt->tcp_copied);
789 xprt->tcp_offset = offset;
790 return result;
794 * TCP task queue stuff
797 static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
800 * This is protected from tcp_data_ready and the stack as its run
801 * inside of the RPC I/O daemon
804 void rpciod_tcp_dispatcher(void)
806 struct rpc_xprt *xprt;
807 int result;
809 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
812 * Empty each pending socket
815 while((xprt=rpc_xprt_pending)!=NULL)
817 int safe_retry=0;
819 rpc_xprt_pending=xprt->rx_pending;
820 xprt->rx_pending_flag=0;
822 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
826 if (safe_retry++ > 50)
827 break;
828 result = tcp_input_record(xprt);
830 while (result >= 0);
832 switch (result) {
833 case -EAGAIN:
834 continue;
835 case -ENOTCONN:
836 case -EPIPE:
837 xprt_disconnect(xprt);
838 continue;
839 default:
840 printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
841 result);
847 extern inline void tcp_rpciod_queue(void)
849 rpciod_wake_up();
853 * data_ready callback for TCP. We can't just jump into the
854 * tcp recvmsg functions inside of the network receive bh or
855 * bad things occur. We queue it to pick up after networking
856 * is done.
859 static void tcp_data_ready(struct sock *sk, int len)
861 struct rpc_xprt *xprt;
863 dprintk("RPC: tcp_data_ready...\n");
864 if (!(xprt = xprt_from_sock(sk)))
866 printk("Not a socket with xprt %p\n", sk);
867 return;
869 dprintk("RPC: tcp_data_ready client %p\n", xprt);
870 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
871 sk->state, xprt->connected,
872 sk->dead, sk->zapped);
874 * If we are not waiting for the RPC bh run then
875 * we are now
877 if (!xprt->rx_pending_flag)
879 int start_queue=0;
881 dprintk("RPC: xprt queue %p\n", rpc_xprt_pending);
882 if(rpc_xprt_pending==NULL)
883 start_queue=1;
884 xprt->rx_pending_flag=1;
885 xprt->rx_pending=rpc_xprt_pending;
886 rpc_xprt_pending=xprt;
887 if (start_queue)
889 tcp_rpciod_queue();
890 start_queue=0;
893 else
894 dprintk("RPC: xprt queued already %p\n", xprt);
898 static void
899 tcp_state_change(struct sock *sk)
901 struct rpc_xprt *xprt;
903 if (!(xprt = xprt_from_sock(sk)))
904 return;
905 dprintk("RPC: tcp_state_change client %p...\n", xprt);
906 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
907 sk->state, xprt->connected,
908 sk->dead, sk->zapped);
910 if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
911 xprt->connected = 1;
912 xprt->connecting = 0;
913 rpc_wake_up(&xprt->reconn);
914 } else if (sk->zapped) {
915 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
916 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
917 rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
921 static void
922 tcp_write_space(struct sock *sk)
924 struct rpc_xprt *xprt;
926 if (!(xprt = xprt_from_sock(sk)))
927 return;
928 if(xprt->snd_sent && xprt->snd_task)
929 dprintk("RPC: write space\n");
930 if(xprt->write_space == 0)
932 xprt->write_space = 1;
933 if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
935 if(xprt->snd_sent)
936 dprintk("RPC: Write wakeup snd_sent =%d\n",
937 xprt->snd_sent);
938 rpc_wake_up_task(xprt->snd_task);
944 * RPC receive timeout handler.
946 static void
947 xprt_timer(struct rpc_task *task)
949 struct rpc_rqst *req = task->tk_rqstp;
951 if (req) {
952 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
955 dprintk("RPC: %4d xprt_timer (%s request)\n",
956 task->tk_pid, req ? "pending" : "backlogged");
958 task->tk_status = -ETIMEDOUT;
959 task->tk_timeout = 0;
960 rpc_wake_up_task(task);
964 * (Partly) transmit the RPC packet
965 * Note that task->tk_status is either 0 or negative on return.
966 * Only when the reply is received will the status be set to a
967 * positive value.
969 static inline int
970 xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
972 struct rpc_rqst *req = task->tk_rqstp;
973 int result;
975 task->tk_status = 0;
976 if ((result = xprt_sendmsg(xprt)) >= 0) {
977 if (!xprt->snd_buf.io_len || !xprt->stream) {
978 rpc_wake_up_next(&xprt->sending);
979 return req->rq_slen;
981 result = -EAGAIN;
982 } else if (xprt->stream) {
983 if (result == -ENOTCONN || result == -EPIPE) {
984 xprt_disconnect(xprt);
985 result = -ENOTCONN;
988 return task->tk_status = result;
992 * Place the actual RPC call.
993 * We have to copy the iovec because sendmsg fiddles with its contents.
995 void
996 xprt_transmit(struct rpc_task *task)
998 struct rpc_timeout *timeo;
999 struct rpc_rqst *req = task->tk_rqstp;
1000 struct rpc_xprt *xprt = req->rq_xprt;
1001 int status;
1003 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
1004 *(u32 *)(req->rq_svec[0].iov_base));
1006 if (xprt->shutdown) {
1007 task->tk_status = -EIO;
1008 return;
1011 /* If we're not already in the process of transmitting our call,
1012 * set up everything as needed. */
1013 if (xprt->snd_task != task) {
1014 /* Write the record marker */
1015 if (xprt->stream) {
1016 u32 marker;
1018 if (!xprt->connected) {
1019 task->tk_status = -ENOTCONN;
1020 return;
1022 marker = htonl(0x80000000|(req->rq_slen-4));
1023 *((u32 *) req->rq_svec[0].iov_base) = marker;
1026 /* Reset timeout parameters */
1027 timeo = &req->rq_timeout;
1028 if (timeo->to_retries < 0) {
1029 dprintk("RPC: %4d xprt_transmit reset timeo\n",
1030 task->tk_pid);
1031 timeo->to_retries = xprt->timeout.to_retries;
1032 timeo->to_current = timeo->to_initval;
1035 #ifdef RPC_PROFILE
1036 req->rq_xtime = jiffies;
1037 #endif
1038 req->rq_gotit = 0;
1040 if (xprt->snd_task) {
1041 dprintk("RPC: %4d TCP write queue full (task %d)\n",
1042 task->tk_pid, xprt->snd_task->tk_pid);
1043 rpc_sleep_on(&xprt->sending, task,
1044 xprt_transmit_status, NULL);
1045 return;
1047 xprt->snd_buf = req->rq_snd_buf;
1048 xprt->snd_task = task;
1049 xprt->snd_sent = 0;
1052 /* For fast networks/servers we have to put the request on
1053 * the pending list now:
1055 start_bh_atomic();
1056 status = rpc_add_wait_queue(&xprt->pending, task);
1057 if (!status)
1058 task->tk_callback = NULL;
1059 end_bh_atomic();
1061 if (status)
1063 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
1064 task->tk_status = status;
1065 return;
1068 /* Continue transmitting the packet/record. We must be careful
1069 * to cope with writespace callbacks arriving _after_ we have
1070 * called xprt_sendmsg().
1072 while (1) {
1073 xprt->write_space = 0;
1074 if (xprt_transmit_some(xprt, task) != -EAGAIN) {
1075 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1076 xprt->snd_task = NULL;
1077 return;
1080 /*d*/dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
1081 task->tk_pid, xprt->snd_buf.io_len,
1082 req->rq_slen);
1083 task->tk_status = 0;
1084 start_bh_atomic();
1085 if (!xprt->write_space) {
1086 /* Remove from pending */
1087 rpc_remove_wait_queue(task);
1088 rpc_sleep_on(&xprt->sending, task,
1089 xprt_transmit_status, NULL);
1090 end_bh_atomic();
1091 return;
1093 end_bh_atomic();
1098 * This callback is invoked when the sending task is forced to sleep
1099 * because the TCP write buffers are full
1101 static void
1102 xprt_transmit_status(struct rpc_task *task)
1104 struct rpc_xprt *xprt = task->tk_client->cl_xprt;
1106 dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
1107 if (xprt->snd_task == task)
1109 if (task->tk_status < 0)
1111 xprt->snd_task = NULL;
1112 xprt_disconnect(xprt);
1114 else
1115 xprt_transmit(task);
1120 * Wait for the reply to our call.
1121 * When the callback is invoked, the congestion window should have
1122 * been updated already.
1124 void
1125 xprt_receive(struct rpc_task *task)
1127 struct rpc_rqst *req = task->tk_rqstp;
1128 struct rpc_xprt *xprt = req->rq_xprt;
1130 dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1131 if (xprt->connected == 0) {
1132 task->tk_status = -ENOTCONN;
1133 return;
1137 * Wait until rq_gotit goes non-null, or timeout elapsed.
1139 task->tk_timeout = req->rq_timeout.to_current;
1141 start_bh_atomic();
1142 if (!req->rq_gotit) {
1143 rpc_sleep_on(&xprt->pending, task,
1144 xprt_receive_status, xprt_timer);
1146 end_bh_atomic();
1148 dprintk("RPC: %4d xprt_receive returns %d\n",
1149 task->tk_pid, task->tk_status);
1152 static void
1153 xprt_receive_status(struct rpc_task *task)
1155 struct rpc_xprt *xprt = task->tk_xprt;
1157 if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
1158 xprt->tcp_rqstp = NULL;
1162 * Reserve an RPC call slot.
1165 xprt_reserve(struct rpc_task *task)
1167 struct rpc_xprt *xprt = task->tk_xprt;
1169 /* We already have an initialized request. */
1170 if (task->tk_rqstp)
1171 return 0;
1173 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1174 task->tk_pid, xprt->cong, xprt->cwnd);
1175 if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
1176 xprt_reserve_status(task);
1177 task->tk_timeout = 0;
1178 } else if (!task->tk_timeout) {
1179 task->tk_status = -ENOBUFS;
1180 } else {
1181 dprintk("RPC: xprt_reserve waiting on backlog\n");
1182 rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL);
1184 dprintk("RPC: %4d xprt_reserve returns %d\n",
1185 task->tk_pid, task->tk_status);
1186 return task->tk_status;
1190 * Reservation callback
1192 static void
1193 xprt_reserve_status(struct rpc_task *task)
1195 struct rpc_xprt *xprt = task->tk_xprt;
1196 struct rpc_rqst *req;
1198 if (xprt->shutdown) {
1199 task->tk_status = -EIO;
1200 } else if (task->tk_status < 0) {
1201 /* NOP */
1202 } else if (task->tk_rqstp) {
1203 /* We've already been given a request slot: NOP */
1204 } else if (!RPCXPRT_CONGESTED(xprt)) {
1205 /* OK: There's room for us. Grab a free slot and bump
1206 * congestion value */
1207 req = xprt->free;
1208 if (!req)
1209 goto bad_list;
1210 if (req->rq_xid)
1211 goto bad_used;
1212 xprt->free = req->rq_next;
1213 xprt->cong += RPC_CWNDSCALE;
1214 task->tk_rqstp = req;
1215 req->rq_next = NULL;
1216 xprt_request_init(task, xprt);
1217 } else {
1218 task->tk_status = -EAGAIN;
1221 if (xprt->free && !RPCXPRT_CONGESTED(xprt))
1222 rpc_wake_up_next(&xprt->backlog);
1224 return;
1226 bad_list:
1227 printk(KERN_ERR
1228 "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1229 task->tk_pid, xprt->cong, xprt->cwnd);
1230 rpc_debug = ~0;
1231 goto bummer;
1232 bad_used:
1233 printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req);
1234 bummer:
1235 task->tk_status = -EIO;
1236 xprt->free = NULL;
1237 return;
1241 * Initialize RPC request
1243 static void
1244 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1246 struct rpc_rqst *req = task->tk_rqstp;
1247 static u32 xid = 0;
1249 if (!xid)
1250 xid = CURRENT_TIME << 12;
1252 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1253 task->tk_status = 0;
1254 req->rq_gotit = 0;
1255 req->rq_timeout = xprt->timeout;
1256 req->rq_task = task;
1257 req->rq_xprt = xprt;
1258 req->rq_xid = xid++;
1259 if (!xid)
1260 xid++;
1264 * Release an RPC call slot
1266 void
1267 xprt_release(struct rpc_task *task)
1269 struct rpc_xprt *xprt = task->tk_xprt;
1270 struct rpc_rqst *req;
1272 if (!(req = task->tk_rqstp))
1273 return;
1274 task->tk_rqstp = NULL;
1275 memset(req, 0, sizeof(*req)); /* mark unused */
1277 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1279 /* remove slot from queue of pending */
1280 start_bh_atomic();
1281 if (task->tk_rpcwait) {
1282 printk("RPC: task of released request still queued!\n");
1283 #ifdef RPC_DEBUG
1284 printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
1285 #endif
1286 rpc_del_timer(task);
1287 rpc_remove_wait_queue(task);
1289 end_bh_atomic();
1291 /* Decrease congestion value. */
1292 xprt->cong -= RPC_CWNDSCALE;
1294 #if 0
1295 /* If congestion threshold is not yet reached, pass on the request slot.
1296 * This looks kind of kludgy, but it guarantees backlogged requests
1297 * are served in order.
1298 * N.B. This doesn't look completely safe, as the task is still
1299 * on the backlog list after wake-up.
1301 if (!RPCXPRT_CONGESTED(xprt)) {
1302 struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
1304 if (next && next->tk_rqstp == 0) {
1305 xprt->cong += RPC_CWNDSCALE;
1306 next->tk_rqstp = req;
1307 xprt_request_init(next, xprt);
1308 return;
1311 #endif
1313 req->rq_next = xprt->free;
1314 xprt->free = req;
1316 /* If not congested, wake up the next backlogged process */
1317 if (!RPCXPRT_CONGESTED(xprt))
1318 rpc_wake_up_next(&xprt->backlog);
1322 * Set default timeout parameters
1324 void
1325 xprt_default_timeout(struct rpc_timeout *to, int proto)
1327 if (proto == IPPROTO_UDP)
1328 xprt_set_timeout(to, 5, 5 * HZ);
1329 else
1330 xprt_set_timeout(to, 5, 15 * HZ);
1334 * Set constant timeout
1336 void
1337 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1339 to->to_current =
1340 to->to_initval =
1341 to->to_increment = incr;
1342 to->to_maxval = incr * retr;
1343 to->to_resrvval = incr * retr;
1344 to->to_retries = retr;
1345 to->to_exponential = 0;
1349 * Initialize an RPC client
1351 static struct rpc_xprt *
1352 xprt_setup(struct socket *sock, int proto,
1353 struct sockaddr_in *ap, struct rpc_timeout *to)
1355 struct rpc_xprt *xprt;
1356 struct rpc_rqst *req;
1357 struct sock *inet;
1358 int i;
1360 dprintk("RPC: setting up %s transport...\n",
1361 proto == IPPROTO_UDP? "UDP" : "TCP");
1363 #if LINUX_VERSION_CODE >= 0x020100
1364 inet = sock->sk;
1365 #else
1366 inet = (struct sock *) sock->data;
1367 #endif
1369 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1370 return NULL;
1371 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1373 xprt->file = NULL;
1374 xprt->sock = sock;
1375 xprt->inet = inet;
1376 xprt->addr = *ap;
1377 xprt->prot = proto;
1378 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1379 xprt->cwnd = RPC_INITCWND;
1380 #ifdef SOCK_HAS_USER_DATA
1381 inet->user_data = xprt;
1382 #else
1383 xprt->link = sock_list;
1384 sock_list = xprt;
1385 #endif
1386 xprt->old_data_ready = inet->data_ready;
1387 xprt->old_state_change = inet->state_change;
1388 xprt->old_write_space = inet->write_space;
1389 if (proto == IPPROTO_UDP) {
1390 inet->data_ready = udp_data_ready;
1391 inet->no_check = UDP_CSUM_NORCV;
1392 } else {
1393 inet->data_ready = tcp_data_ready;
1394 inet->state_change = tcp_state_change;
1395 inet->write_space = tcp_write_space;
1396 xprt->nocong = 1;
1398 xprt->connected = 1;
1400 /* Set timeout parameters */
1401 if (to) {
1402 xprt->timeout = *to;
1403 xprt->timeout.to_current = to->to_initval;
1404 xprt->timeout.to_resrvval = to->to_maxval << 1;
1405 } else {
1406 xprt_default_timeout(&xprt->timeout, xprt->prot);
1409 xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1410 xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1411 xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1412 xprt->reconn = RPC_INIT_WAITQ("xprt_reconn");
1414 /* initialize free list */
1415 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1416 req->rq_next = req + 1;
1417 req->rq_next = NULL;
1418 xprt->free = xprt->slot;
1420 dprintk("RPC: created transport %p\n", xprt);
1423 * TCP requires the rpc I/O daemon is present
1425 if(proto==IPPROTO_TCP)
1426 rpciod_up();
1427 return xprt;
1431 * Bind to a reserved port
1433 static inline int
1434 xprt_bindresvport(struct socket *sock)
1436 struct sockaddr_in myaddr;
1437 int err, port;
1439 memset(&myaddr, 0, sizeof(myaddr));
1440 myaddr.sin_family = AF_INET;
1441 port = 800;
1442 do {
1443 myaddr.sin_port = htons(port);
1444 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1445 sizeof(myaddr));
1446 } while (err == -EADDRINUSE && --port > 0);
1448 if (err < 0)
1449 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1451 return err;
1455 * Create a client socket given the protocol and peer address.
1457 static struct socket *
1458 xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1460 struct socket *sock;
1461 int type, err;
1463 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1464 sap? ntohl(sap->sin_addr.s_addr) : 0,
1465 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1467 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1468 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1469 printk("RPC: can't create socket (%d).\n", -err);
1470 goto failed;
1473 /* If the caller has root privs, bind to a reserved port */
1474 if (!current->fsuid && xprt_bindresvport(sock) < 0)
1475 goto failed;
1477 if (type == SOCK_STREAM && sap) {
1478 err = sock->ops->connect(sock, (struct sockaddr *) sap,
1479 sizeof(*sap), 0);
1480 if (err < 0) {
1481 printk("RPC: TCP connect failed (%d).\n", -err);
1482 goto failed;
1486 return sock;
1488 failed:
1489 sock_release(sock);
1490 return NULL;
1494 * Create an RPC client transport given the protocol and peer address.
1496 struct rpc_xprt *
1497 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1499 struct socket *sock;
1500 struct rpc_xprt *xprt;
1502 dprintk("RPC: xprt_create_proto called\n");
1504 if (!(sock = xprt_create_socket(proto, sap, to)))
1505 return NULL;
1507 if (!(xprt = xprt_setup(sock, proto, sap, to)))
1508 sock_release(sock);
1510 return xprt;
1514 * Prepare for transport shutdown.
1516 void
1517 xprt_shutdown(struct rpc_xprt *xprt)
1519 xprt->shutdown = 1;
1520 rpc_wake_up(&xprt->sending);
1521 rpc_wake_up(&xprt->pending);
1522 rpc_wake_up(&xprt->backlog);
1523 rpc_wake_up(&xprt->reconn);
1527 * Destroy an RPC transport, killing off all requests.
1530 xprt_destroy(struct rpc_xprt *xprt)
1532 #ifndef SOCK_HAS_USER_DATA
1533 struct rpc_xprt **q;
1535 for (q = &sock_list; *q && *q != xprt; q = &((*q)->link))
1537 if (!*q) {
1538 printk(KERN_WARNING "xprt_destroy: unknown socket!\n");
1539 return -EIO; /* why is there no EBUGGYSOFTWARE */
1541 *q = xprt->link;
1542 #endif
1544 dprintk("RPC: destroying transport %p\n", xprt);
1545 xprt_close(xprt);
1546 kfree(xprt);
1548 return 0;