Import 2.2.0pre6
[davej-history.git] / net / sunrpc / xprt.c
blob246cfa94bf2eec73e844d6daf3d466435dc09435
1 /*
2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
40 #define __KERNEL_SYSCALLS__
42 #include <linux/version.h>
43 #include <linux/types.h>
44 #include <linux/malloc.h>
45 #include <linux/sched.h>
46 #include <linux/errno.h>
47 #include <linux/socket.h>
48 #include <linux/in.h>
49 #include <linux/net.h>
50 #include <linux/mm.h>
51 #include <linux/udp.h>
52 #include <linux/unistd.h>
53 #include <linux/sunrpc/clnt.h>
54 #include <linux/file.h>
56 #include <net/sock.h>
58 #include <asm/uaccess.h>
60 #define SOCK_HAS_USER_DATA
63 * Local variables
65 #ifndef SOCK_HAS_USER_DATA
66 static struct rpc_xprt * sock_list = NULL;
67 #endif
69 #ifdef RPC_DEBUG
70 # undef RPC_DEBUG_DATA
71 # define RPCDBG_FACILITY RPCDBG_XPRT
72 #endif
74 #ifndef MAX
75 # define MAX(a, b) ((a) > (b)? (a) : (b))
76 # define MIN(a, b) ((a) < (b)? (a) : (b))
77 #endif
80 * Local functions
82 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
83 static void xprt_transmit_status(struct rpc_task *task);
84 static void xprt_receive_status(struct rpc_task *task);
85 static void xprt_reserve_status(struct rpc_task *task);
86 static void xprt_reconn_timeout(struct rpc_task *task);
87 static void xprt_reconn_status(struct rpc_task *task);
88 static struct socket *xprt_create_socket(int, struct sockaddr_in *,
89 struct rpc_timeout *);
91 #ifdef RPC_DEBUG_DATA
93 * Print the buffer contents (first 128 bytes only--just enough for
94 * diropres return).
96 static void
97 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
99 u8 *buf = (u8 *) packet;
100 int j;
102 dprintk("RPC: %s\n", msg);
103 for (j = 0; j < count && j < 128; j += 4) {
104 if (!(j & 31)) {
105 if (j)
106 dprintk("\n");
107 dprintk("0x%04x ", j);
109 dprintk("%02x%02x%02x%02x ",
110 buf[j], buf[j+1], buf[j+2], buf[j+3]);
112 dprintk("\n");
114 #else
115 static inline void
116 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
118 /* NOP */
120 #endif
123 * Look up RPC transport given an INET socket
125 static inline struct rpc_xprt *
126 xprt_from_sock(struct sock *sk)
128 #ifndef SOCK_HAS_USER_DATA
129 struct rpc_xprt *xprt;
131 for (xprt = sock_list; xprt && sk != xprt->inet; xprt = xprt->link)
133 return xprt;
134 #else
135 return (struct rpc_xprt *) sk->user_data;
136 #endif
140 * Adjust the iovec to move on 'n' bytes
143 extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
145 struct iovec *iv=msg->msg_iov;
148 * Eat any sent iovecs
151 while(iv->iov_len < amount)
153 amount-=iv->iov_len;
154 iv++;
155 msg->msg_iovlen--;
158 msg->msg_iov=niv;
161 * And chew down the partial one
164 niv[0].iov_len = iv->iov_len-amount;
165 niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
166 iv++;
169 * And copy any others
172 for(amount=1;amount<msg->msg_iovlen; amount++)
174 niv[amount]=*iv++;
179 * Write data to socket.
182 static inline int
183 xprt_sendmsg(struct rpc_xprt *xprt)
185 struct socket *sock = xprt->sock;
186 struct msghdr msg;
187 mm_segment_t oldfs;
188 int result;
189 struct iovec niv[MAX_IOVEC];
191 xprt_pktdump("packet data:",
192 xprt->snd_buf.io_vec->iov_base,
193 xprt->snd_buf.io_vec->iov_len);
195 msg.msg_flags = MSG_DONTWAIT;
196 msg.msg_iov = xprt->snd_buf.io_vec;
197 msg.msg_iovlen = xprt->snd_buf.io_nr;
198 msg.msg_name = (struct sockaddr *) &xprt->addr;
199 msg.msg_namelen = sizeof(xprt->addr);
200 msg.msg_control = NULL;
202 /* Dont repeat bytes */
204 if(xprt->snd_sent)
205 xprt_move_iov(&msg, niv, xprt->snd_sent);
207 oldfs = get_fs(); set_fs(get_ds());
208 result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
209 set_fs(oldfs);
211 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
212 xprt->snd_buf.io_len, result);
214 if (result >= 0) {
215 xprt->snd_buf.io_len -= result;
216 xprt->snd_sent += result;
217 return result;
220 switch (result) {
221 case -ECONNREFUSED:
222 /* When the server has died, an ICMP port unreachable message
223 * prompts ECONNREFUSED.
225 break;
226 case -EAGAIN:
227 return 0;
228 case -ENOTCONN: case -EPIPE:
229 /* connection broken */
230 break;
231 default:
232 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
233 result = 0;
235 return result;
239 * Read data from socket
241 static inline int
242 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
244 struct socket *sock = xprt->sock;
245 struct sockaddr_in sin;
246 struct msghdr msg;
247 mm_segment_t oldfs;
248 int result;
250 #if LINUX_VERSION_CODE >= 0x020100
251 msg.msg_flags = MSG_DONTWAIT;
252 msg.msg_iov = iov;
253 msg.msg_iovlen = nr;
254 msg.msg_name = &sin;
255 msg.msg_namelen = sizeof(sin);
256 msg.msg_control = NULL;
258 oldfs = get_fs(); set_fs(get_ds());
259 result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
260 set_fs(oldfs);
261 #else
262 int alen = sizeof(sin);
263 msg.msg_flags = 0;
264 msg.msg_iov = iov;
265 msg.msg_iovlen = nr;
266 msg.msg_name = &sin;
267 msg.msg_namelen = sizeof(sin);
268 msg.msg_control = NULL;
270 oldfs = get_fs(); set_fs(get_ds());
271 result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
272 set_fs(oldfs);
273 #endif
275 if (!result && len)
276 result = -EAGAIN;
278 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
279 iov, len, result);
280 return result;
285 * Adjust RPC congestion window
286 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
288 static void
289 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
291 unsigned long cwnd = xprt->cwnd;
293 if (xprt->nocong)
294 return;
295 if (result >= 0) {
296 if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
297 return;
298 /* The (cwnd >> 1) term makes sure
299 * the result gets rounded properly. */
300 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
301 if (cwnd > RPC_MAXCWND)
302 cwnd = RPC_MAXCWND;
303 else
304 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
305 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
306 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
307 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
308 (xprt->congtime-jiffies)*1000/HZ);
309 } else if (result == -ETIMEDOUT) {
310 if ((cwnd >>= 1) < RPC_CWNDSCALE)
311 cwnd = RPC_CWNDSCALE;
312 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
313 dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
314 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
315 (xprt->congtime-jiffies)*1000/HZ);
316 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
319 xprt->cwnd = cwnd;
323 * Adjust timeout values etc for next retransmit
326 xprt_adjust_timeout(struct rpc_timeout *to)
328 if (to->to_exponential)
329 to->to_current <<= 1;
330 else
331 to->to_current += to->to_increment;
332 if (to->to_maxval && to->to_current >= to->to_maxval) {
333 to->to_current = to->to_maxval;
334 to->to_retries = 0;
336 if (!to->to_current) {
337 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
338 to->to_current = 5 * HZ;
340 pprintk("RPC: %lu %s\n", jiffies,
341 to->to_retries? "retrans" : "timeout");
342 return (to->to_retries)--;
346 * Close down a transport socket
348 static void
349 xprt_close(struct rpc_xprt *xprt)
351 struct sock *sk = xprt->inet;
353 #ifdef SOCK_HAS_USER_DATA
354 sk->user_data = NULL;
355 #endif
356 sk->data_ready = xprt->old_data_ready;
357 sk->state_change = xprt->old_state_change;
358 sk->write_space = xprt->old_write_space;
360 if (xprt->file)
361 fput(xprt->file);
362 else
363 sock_release(xprt->sock);
365 * TCP doesnt require the rpciod now - other things may
366 * but rpciod handles that not us.
368 if(xprt->stream)
369 rpciod_down();
373 * Mark a transport as disconnected
375 static void
376 xprt_disconnect(struct rpc_xprt *xprt)
378 dprintk("RPC: disconnected transport %p\n", xprt);
379 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
380 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
381 xprt->connected = 0;
385 * Reconnect a broken TCP connection.
387 void
388 xprt_reconnect(struct rpc_task *task)
390 struct rpc_xprt *xprt = task->tk_xprt;
391 struct socket *sock;
392 struct sock *inet;
393 int status;
395 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
396 task->tk_pid, xprt, xprt->connected);
397 task->tk_status = 0;
399 if (xprt->connecting) {
400 task->tk_timeout = xprt->timeout.to_maxval;
401 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
402 return;
404 xprt->connecting = 1;
406 /* Create an unconnected socket */
407 if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
408 goto defer;
410 #if LINUX_VERSION_CODE >= 0x020100
411 inet = sock->sk;
412 #else
413 inet = (struct sock *) sock->data;
414 #endif
415 inet->data_ready = xprt->inet->data_ready;
416 inet->state_change = xprt->inet->state_change;
417 inet->write_space = xprt->inet->write_space;
418 #ifdef SOCK_HAS_USER_DATA
419 inet->user_data = xprt;
420 #endif
422 dprintk("RPC: %4d closing old socket\n", task->tk_pid);
423 xprt_disconnect(xprt);
424 xprt_close(xprt);
426 /* Reset to new socket and default congestion */
427 xprt->sock = sock;
428 xprt->inet = inet;
429 xprt->cwnd = RPC_INITCWND;
431 /* Now connect it asynchronously. */
432 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
433 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
434 sizeof(xprt->addr), O_NONBLOCK);
435 if (status < 0) {
436 if (status != -EINPROGRESS && status != -EALREADY) {
437 printk("RPC: TCP connect error %d!\n", -status);
438 goto defer;
441 dprintk("RPC: %4d connect status %d connected %d\n",
442 task->tk_pid, status, xprt->connected);
443 task->tk_timeout = 60 * HZ;
445 start_bh_atomic();
446 if (!xprt->connected) {
447 rpc_sleep_on(&xprt->reconn, task,
448 xprt_reconn_status, xprt_reconn_timeout);
449 end_bh_atomic();
450 return;
452 end_bh_atomic();
455 xprt->connecting = 0;
456 rpc_wake_up(&xprt->reconn);
457 return;
459 defer:
460 task->tk_timeout = 30 * HZ;
461 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
462 xprt->connecting = 0;
466 * Reconnect status
468 static void
469 xprt_reconn_status(struct rpc_task *task)
471 struct rpc_xprt *xprt = task->tk_xprt;
473 dprintk("RPC: %4d xprt_reconn_status %d\n",
474 task->tk_pid, task->tk_status);
475 if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
476 task->tk_timeout = 30 * HZ;
477 rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
482 * Reconnect timeout. We just mark the transport as not being in the
483 * process of reconnecting, and leave the rest to the upper layers.
485 static void
486 xprt_reconn_timeout(struct rpc_task *task)
488 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
489 task->tk_pid, task->tk_status);
490 task->tk_status = -ENOTCONN;
491 task->tk_xprt->connecting = 0;
492 task->tk_timeout = 0;
493 rpc_wake_up_task(task);
497 * Look up the RPC request corresponding to a reply.
499 static inline struct rpc_rqst *
500 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
502 struct rpc_task *head, *task;
503 struct rpc_rqst *req;
504 int safe = 0;
506 if ((head = xprt->pending.task) != NULL) {
507 task = head;
508 do {
509 if ((req = task->tk_rqstp) && req->rq_xid == xid)
510 return req;
511 task = task->tk_next;
512 if (++safe > 100) {
513 printk("xprt_lookup_rqst: loop in Q!\n");
514 return NULL;
516 } while (task != head);
518 dprintk("RPC: unknown XID %08x in reply.\n", xid);
519 return NULL;
523 * Complete reply received.
524 * The TCP code relies on us to remove the request from xprt->pending.
526 static inline void
527 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
529 struct rpc_task *task = req->rq_task;
531 req->rq_rlen = copied;
532 req->rq_gotit = 1;
534 /* Adjust congestion window */
535 xprt_adjust_cwnd(xprt, copied);
537 #ifdef RPC_PROFILE
538 /* Profile only reads for now */
539 if (copied > 1024) {
540 static unsigned long nextstat = 0;
541 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
543 pkt_cnt++;
544 pkt_len += req->rq_slen + copied;
545 pkt_rtt += jiffies - req->rq_xtime;
546 if (time_before(nextstat, jiffies)) {
547 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
548 printk("RPC: %ld %ld %ld %ld stat\n",
549 jiffies, pkt_cnt, pkt_len, pkt_rtt);
550 pkt_rtt = pkt_len = pkt_cnt = 0;
551 nextstat = jiffies + 5 * HZ;
554 #endif
556 /* ... and wake up the process. */
557 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
558 task->tk_status = copied;
560 rpc_wake_up_task(task);
561 return;
565 * Input handler for RPC replies. Called from a bottom half and hence
566 * atomic.
568 static inline void
569 udp_data_ready(struct sock *sk, int len)
571 struct rpc_task *task;
572 struct rpc_xprt *xprt;
573 struct rpc_rqst *rovr;
574 struct sk_buff *skb;
575 struct iovec iov[MAX_IOVEC];
576 int err, repsize, copied;
578 dprintk("RPC: udp_data_ready...\n");
579 if (!(xprt = xprt_from_sock(sk)))
580 return;
581 dprintk("RPC: udp_data_ready client %p\n", xprt);
583 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
584 return;
585 repsize = skb->len - 8; /* don't account for UDP header */
587 if (repsize < 4) {
588 printk("RPC: impossible RPC reply size %d!\n", repsize);
589 goto dropit;
592 /* Look up the request corresponding to the given XID */
593 if (!(rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + 8))))
594 goto dropit;
595 task = rovr->rq_task;
597 dprintk("RPC: %4d received reply\n", task->tk_pid);
598 xprt_pktdump("packet data:", (u32 *) (skb->h.raw+8), repsize);
600 if ((copied = rovr->rq_rlen) > repsize)
601 copied = repsize;
603 /* Okay, we have it. Copy datagram... */
604 memcpy(iov, rovr->rq_rvec, rovr->rq_rnr * sizeof(iov[0]));
605 /* This needs to stay tied with the usermode skb_copy_dagram... */
606 memcpy_tokerneliovec(iov, skb->data+8, copied);
608 xprt_complete_rqst(xprt, rovr, copied);
610 dropit:
611 skb_free_datagram(sk, skb);
612 return;
616 * TCP record receive routine
617 * This is not the most efficient code since we call recvfrom twice--
618 * first receiving the record marker and XID, then the data.
620 * The optimal solution would be a RPC support in the TCP layer, which
621 * would gather all data up to the next record marker and then pass us
622 * the list of all TCP segments ready to be copied.
624 static inline int
625 tcp_input_record(struct rpc_xprt *xprt)
627 struct rpc_rqst *req;
628 struct iovec *iov;
629 struct iovec riov;
630 u32 offset;
631 int result, maxcpy, reclen, avail, want;
633 dprintk("RPC: tcp_input_record\n");
634 offset = xprt->tcp_offset;
635 result = -EAGAIN;
636 if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
637 want = (xprt->tcp_more? 4 : 8) - offset;
638 dprintk("RPC: reading header (%d bytes)\n", want);
639 riov.iov_base = xprt->tcp_recm.data + offset;
640 riov.iov_len = want;
641 result = xprt_recvmsg(xprt, &riov, 1, want);
642 if (result < 0)
643 goto done;
644 offset += result;
645 if (result < want) {
646 result = -EAGAIN;
647 goto done;
650 /* Get the record length and mask out the more_fragments bit */
651 reclen = ntohl(xprt->tcp_reclen);
652 dprintk("RPC: reclen %08x\n", reclen);
653 xprt->tcp_more = (reclen & 0x80000000)? 0 : 1;
654 if (!(reclen &= 0x7fffffff)) {
655 printk(KERN_NOTICE "RPC: empty TCP record.\n");
656 return -ENOTCONN; /* break connection */
658 xprt->tcp_total += reclen;
659 xprt->tcp_reclen = reclen;
661 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
662 xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more);
663 if (!xprt->tcp_copied
664 && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) {
665 iov = xprt->tcp_iovec;
666 memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
667 #if 0
668 *(u32 *)iov->iov_base = req->rq_xid;
669 #endif
670 iov->iov_base += 4;
671 iov->iov_len -= 4;
672 xprt->tcp_copied = 4;
673 xprt->tcp_rqstp = req;
675 } else {
676 reclen = xprt->tcp_reclen;
679 avail = reclen - (offset - 4);
680 if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid
681 && req->rq_task->tk_rpcwait == &xprt->pending) {
682 want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
684 dprintk("RPC: %4d TCP receiving %d bytes\n",
685 req->rq_task->tk_pid, want);
686 result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
687 if (result < 0)
688 goto done;
689 xprt->tcp_copied += result;
690 offset += result;
691 avail -= result;
692 if (result < want) {
693 result = -EAGAIN;
694 goto done;
697 maxcpy = MIN(req->rq_rlen, xprt->tcp_total);
698 if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) {
699 dprintk("RPC: %4d received reply complete\n",
700 req->rq_task->tk_pid);
701 xprt_complete_rqst(xprt, req, xprt->tcp_total);
702 xprt->tcp_copied = 0;
703 xprt->tcp_rqstp = NULL;
705 /* Request must be re-encoded before retransmit */
706 req->rq_damaged = 1;
709 /* Skip over any trailing bytes on short reads */
710 while (avail) {
711 static u8 dummy[64];
713 want = MIN(avail, sizeof(dummy));
714 riov.iov_base = dummy;
715 riov.iov_len = want;
716 dprintk("RPC: TCP skipping %d bytes\n", want);
717 result = xprt_recvmsg(xprt, &riov, 1, want);
718 if (result < 0)
719 goto done;
720 offset += result;
721 avail -= result;
722 if (result < want) {
723 result = -EAGAIN;
724 goto done;
727 if (!xprt->tcp_more)
728 xprt->tcp_total = 0;
729 offset = 0;
731 done:
732 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
733 offset, xprt->tcp_total, xprt->tcp_copied);
734 xprt->tcp_offset = offset;
735 return result;
739 * TCP task queue stuff
742 static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
745 * This is protected from tcp_data_ready and the stack as its run
746 * inside of the RPC I/O daemon
749 void rpciod_tcp_dispatcher(void)
751 struct rpc_xprt *xprt;
752 int result;
754 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
757 * Empty each pending socket
760 while((xprt=rpc_xprt_pending)!=NULL)
762 int safe_retry=0;
764 rpc_xprt_pending=xprt->rx_pending;
765 xprt->rx_pending_flag=0;
767 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
771 if (safe_retry++ > 50)
772 break;
773 result = tcp_input_record(xprt);
775 while (result >= 0);
777 switch (result) {
778 case -EAGAIN:
779 continue;
780 case -ENOTCONN:
781 case -EPIPE:
782 xprt_disconnect(xprt);
783 continue;
784 default:
785 printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
786 result);
792 extern inline void tcp_rpciod_queue(void)
794 rpciod_wake_up();
798 * data_ready callback for TCP. We can't just jump into the
799 * tcp recvmsg functions inside of the network receive bh or
800 * bad things occur. We queue it to pick up after networking
801 * is done.
804 static void tcp_data_ready(struct sock *sk, int len)
806 struct rpc_xprt *xprt;
808 dprintk("RPC: tcp_data_ready...\n");
809 if (!(xprt = xprt_from_sock(sk)))
811 printk("Not a socket with xprt %p\n", sk);
812 return;
814 dprintk("RPC: tcp_data_ready client %p\n", xprt);
815 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
816 sk->state, xprt->connected,
817 sk->dead, sk->zapped);
819 * If we are not waiting for the RPC bh run then
820 * we are now
822 if (!xprt->rx_pending_flag)
824 int start_queue=0;
826 dprintk("RPC: xprt queue %p\n", rpc_xprt_pending);
827 if(rpc_xprt_pending==NULL)
828 start_queue=1;
829 xprt->rx_pending_flag=1;
830 xprt->rx_pending=rpc_xprt_pending;
831 rpc_xprt_pending=xprt;
832 if (start_queue)
834 tcp_rpciod_queue();
835 start_queue=0;
838 else
839 dprintk("RPC: xprt queued already %p\n", xprt);
843 static void
844 tcp_state_change(struct sock *sk)
846 struct rpc_xprt *xprt;
848 if (!(xprt = xprt_from_sock(sk)))
849 return;
850 dprintk("RPC: tcp_state_change client %p...\n", xprt);
851 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
852 sk->state, xprt->connected,
853 sk->dead, sk->zapped);
855 if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
856 xprt->connected = 1;
857 xprt->connecting = 0;
858 rpc_wake_up(&xprt->reconn);
859 } else if (sk->zapped) {
860 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
861 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
862 rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
866 static void
867 tcp_write_space(struct sock *sk)
869 struct rpc_xprt *xprt;
871 if (!(xprt = xprt_from_sock(sk)))
872 return;
873 if(xprt->snd_sent && xprt->snd_task)
874 printk("write space\n");
875 if(xprt->write_space == 0)
877 xprt->write_space = 1;
878 if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
880 if(xprt->snd_sent)
881 printk("Write wakeup snd_sent =%d\n",
882 xprt->snd_sent);
883 rpc_wake_up_task(xprt->snd_task);
889 * RPC receive timeout handler.
891 static void
892 xprt_timer(struct rpc_task *task)
894 struct rpc_rqst *req = task->tk_rqstp;
896 if (req) {
897 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
900 dprintk("RPC: %4d xprt_timer (%s request)\n",
901 task->tk_pid, req ? "pending" : "backlogged");
903 task->tk_status = -ETIMEDOUT;
904 task->tk_timeout = 0;
905 rpc_wake_up_task(task);
909 * (Partly) transmit the RPC packet
910 * Note that task->tk_status is either 0 or negative on return.
911 * Only when the reply is received will the status be set to a
912 * positive value.
914 static inline int
915 xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
917 struct rpc_rqst *req = task->tk_rqstp;
918 int result;
920 task->tk_status = 0;
921 if ((result = xprt_sendmsg(xprt)) >= 0) {
922 if (!xprt->snd_buf.io_len || !xprt->stream) {
923 rpc_wake_up_next(&xprt->sending);
924 return req->rq_slen;
926 result = -EAGAIN;
927 } else if (xprt->stream) {
928 if (result == -ENOTCONN || result == -EPIPE) {
929 xprt_disconnect(xprt);
930 result = -ENOTCONN;
933 return task->tk_status = result;
937 * Place the actual RPC call.
938 * We have to copy the iovec because sendmsg fiddles with its contents.
940 void
941 xprt_transmit(struct rpc_task *task)
943 struct rpc_timeout *timeo;
944 struct rpc_rqst *req = task->tk_rqstp;
945 struct rpc_xprt *xprt = req->rq_xprt;
946 int status;
948 /*DEBUG*/int ac_debug=xprt->snd_sent;
950 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
951 *(u32 *)(req->rq_svec[0].iov_base));
953 if (xprt->shutdown) {
954 task->tk_status = -EIO;
955 return;
958 /* If we're not already in the process of transmitting our call,
959 * set up everything as needed. */
960 if (xprt->snd_task != task) {
961 /* Write the record marker */
962 if (xprt->stream) {
963 u32 marker;
965 if (!xprt->connected) {
966 task->tk_status = -ENOTCONN;
967 return;
969 marker = htonl(0x80000000|(req->rq_slen-4));
970 *((u32 *) req->rq_svec[0].iov_base) = marker;
973 /* Reset timeout parameters */
974 timeo = &req->rq_timeout;
975 if (timeo->to_retries < 0) {
976 dprintk("RPC: %4d xprt_transmit reset timeo\n",
977 task->tk_pid);
978 timeo->to_retries = xprt->timeout.to_retries;
979 timeo->to_current = timeo->to_initval;
982 #ifdef RPC_PROFILE
983 req->rq_xtime = jiffies;
984 #endif
985 req->rq_gotit = 0;
987 if (xprt->snd_task) {
988 dprintk("RPC: %4d TCP write queue full (task %d)\n",
989 task->tk_pid, xprt->snd_task->tk_pid);
990 rpc_sleep_on(&xprt->sending, task,
991 xprt_transmit_status, NULL);
992 return;
994 xprt->snd_buf = req->rq_snd_buf;
995 xprt->snd_task = task;
996 xprt->snd_sent = 0;
997 /*DEBUG*/ac_debug = 0;
1000 /* For fast networks/servers we have to put the request on
1001 * the pending list now:
1003 start_bh_atomic();
1004 status = rpc_add_wait_queue(&xprt->pending, task);
1005 if (!status)
1006 task->tk_callback = NULL;
1007 end_bh_atomic();
1009 if (status)
1011 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
1012 task->tk_status = status;
1013 return;
1016 /* Continue transmitting the packet/record. We must be careful
1017 * to cope with writespace callbacks arriving _after_ we have
1018 * called xprt_sendmsg().
1020 while (1) {
1021 xprt->write_space = 0;
1022 if (xprt_transmit_some(xprt, task) != -EAGAIN) {
1023 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1024 xprt->snd_task = NULL;
1025 if(ac_debug)
1026 printk("Partial xmit finished\n");
1027 return;
1030 /*d*/printk("RPC: %4d xmit incomplete (%d left of %d)\n",
1031 task->tk_pid, xprt->snd_buf.io_len,
1032 req->rq_slen);
1033 task->tk_status = 0;
1034 start_bh_atomic();
1035 if (!xprt->write_space) {
1036 /* Remove from pending */
1037 rpc_remove_wait_queue(task);
1038 rpc_sleep_on(&xprt->sending, task,
1039 xprt_transmit_status, NULL);
1040 end_bh_atomic();
1041 return;
1043 end_bh_atomic();
1048 * This callback is invoked when the sending task is forced to sleep
1049 * because the TCP write buffers are full
1051 static void
1052 xprt_transmit_status(struct rpc_task *task)
1054 struct rpc_xprt *xprt = task->tk_client->cl_xprt;
1056 dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
1057 if (xprt->snd_task == task)
1059 if (task->tk_status < 0)
1061 xprt->snd_task = NULL;
1062 xprt_disconnect(xprt);
1064 else
1065 xprt_transmit(task);
1070 * Wait for the reply to our call.
1071 * When the callback is invoked, the congestion window should have
1072 * been updated already.
1074 void
1075 xprt_receive(struct rpc_task *task)
1077 struct rpc_rqst *req = task->tk_rqstp;
1078 struct rpc_xprt *xprt = req->rq_xprt;
1080 dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1081 if (xprt->connected == 0) {
1082 task->tk_status = -ENOTCONN;
1083 return;
1087 * Wait until rq_gotit goes non-null, or timeout elapsed.
1089 task->tk_timeout = req->rq_timeout.to_current;
1091 start_bh_atomic();
1092 if (!req->rq_gotit) {
1093 rpc_sleep_on(&xprt->pending, task,
1094 xprt_receive_status, xprt_timer);
1096 end_bh_atomic();
1098 dprintk("RPC: %4d xprt_receive returns %d\n",
1099 task->tk_pid, task->tk_status);
1102 static void
1103 xprt_receive_status(struct rpc_task *task)
1105 struct rpc_xprt *xprt = task->tk_xprt;
1107 if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
1108 xprt->tcp_rqstp = NULL;
1112 * Reserve an RPC call slot.
1115 xprt_reserve(struct rpc_task *task)
1117 struct rpc_xprt *xprt = task->tk_xprt;
1119 /* We already have an initialized request. */
1120 if (task->tk_rqstp)
1121 return 0;
1123 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1124 task->tk_pid, xprt->cong, xprt->cwnd);
1125 if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
1126 xprt_reserve_status(task);
1127 task->tk_timeout = 0;
1128 } else if (!task->tk_timeout) {
1129 task->tk_status = -ENOBUFS;
1130 } else {
1131 dprintk("RPC: xprt_reserve waiting on backlog\n");
1132 rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL);
1134 dprintk("RPC: %4d xprt_reserve returns %d\n",
1135 task->tk_pid, task->tk_status);
1136 return task->tk_status;
1140 * Reservation callback
1142 static void
1143 xprt_reserve_status(struct rpc_task *task)
1145 struct rpc_xprt *xprt = task->tk_xprt;
1146 struct rpc_rqst *req;
1148 if (xprt->shutdown) {
1149 task->tk_status = -EIO;
1150 } else if (task->tk_status < 0) {
1151 /* NOP */
1152 } else if (task->tk_rqstp) {
1153 /* We've already been given a request slot: NOP */
1154 } else if (!RPCXPRT_CONGESTED(xprt)) {
1155 /* OK: There's room for us. Grab a free slot and bump
1156 * congestion value */
1157 req = xprt->free;
1158 if (!req)
1159 goto bad_list;
1160 if (req->rq_xid)
1161 goto bad_used;
1162 xprt->free = req->rq_next;
1163 xprt->cong += RPC_CWNDSCALE;
1164 task->tk_rqstp = req;
1165 req->rq_next = NULL;
1166 xprt_request_init(task, xprt);
1167 } else {
1168 task->tk_status = -EAGAIN;
1171 if (xprt->free && !RPCXPRT_CONGESTED(xprt))
1172 rpc_wake_up_next(&xprt->backlog);
1174 return;
1176 bad_list:
1177 printk(KERN_ERR
1178 "RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1179 task->tk_pid, xprt->cong, xprt->cwnd);
1180 rpc_debug = ~0;
1181 goto bummer;
1182 bad_used:
1183 printk(KERN_ERR "RPC: used rqst slot %p on free list!\n", req);
1184 bummer:
1185 task->tk_status = -EIO;
1186 xprt->free = NULL;
1187 return;
1191 * Initialize RPC request
1193 static void
1194 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1196 struct rpc_rqst *req = task->tk_rqstp;
1197 static u32 xid = 0;
1199 if (!xid)
1200 xid = jiffies;
1202 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1203 task->tk_status = 0;
1204 req->rq_gotit = 0;
1205 req->rq_timeout = xprt->timeout;
1206 req->rq_task = task;
1207 req->rq_xprt = xprt;
1208 req->rq_xid = xid++;
1212 * Release an RPC call slot
1214 void
1215 xprt_release(struct rpc_task *task)
1217 struct rpc_xprt *xprt = task->tk_xprt;
1218 struct rpc_rqst *req;
1220 if (!(req = task->tk_rqstp))
1221 return;
1222 task->tk_rqstp = NULL;
1223 memset(req, 0, sizeof(*req)); /* mark unused */
1225 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1227 /* remove slot from queue of pending */
1228 start_bh_atomic();
1229 if (task->tk_rpcwait) {
1230 printk("RPC: task of released request still queued!\n");
1231 #ifdef RPC_DEBUG
1232 printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
1233 #endif
1234 rpc_del_timer(task);
1235 rpc_remove_wait_queue(task);
1237 end_bh_atomic();
1239 /* Decrease congestion value. */
1240 xprt->cong -= RPC_CWNDSCALE;
1242 #if 0
1243 /* If congestion threshold is not yet reached, pass on the request slot.
1244 * This looks kind of kludgy, but it guarantees backlogged requests
1245 * are served in order.
1246 * N.B. This doesn't look completely safe, as the task is still
1247 * on the backlog list after wake-up.
1249 if (!RPCXPRT_CONGESTED(xprt)) {
1250 struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
1252 if (next && next->tk_rqstp == 0) {
1253 xprt->cong += RPC_CWNDSCALE;
1254 next->tk_rqstp = req;
1255 xprt_request_init(next, xprt);
1256 return;
1259 #endif
1261 req->rq_next = xprt->free;
1262 xprt->free = req;
1264 /* If not congested, wake up the next backlogged process */
1265 if (!RPCXPRT_CONGESTED(xprt))
1266 rpc_wake_up_next(&xprt->backlog);
1270 * Set default timeout parameters
1272 void
1273 xprt_default_timeout(struct rpc_timeout *to, int proto)
1275 if (proto == IPPROTO_UDP)
1276 xprt_set_timeout(to, 5, 5 * HZ);
1277 else
1278 xprt_set_timeout(to, 5, 15 * HZ);
1282 * Set constant timeout
1284 void
1285 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1287 to->to_current =
1288 to->to_initval =
1289 to->to_increment = incr;
1290 to->to_maxval = incr * retr;
1291 to->to_resrvval = incr * retr;
1292 to->to_retries = retr;
1293 to->to_exponential = 0;
1297 * Initialize an RPC client
1299 static struct rpc_xprt *
1300 xprt_setup(struct socket *sock, int proto,
1301 struct sockaddr_in *ap, struct rpc_timeout *to)
1303 struct rpc_xprt *xprt;
1304 struct rpc_rqst *req;
1305 struct sock *inet;
1306 int i;
1308 dprintk("RPC: setting up %s transport...\n",
1309 proto == IPPROTO_UDP? "UDP" : "TCP");
1311 #if LINUX_VERSION_CODE >= 0x020100
1312 inet = sock->sk;
1313 #else
1314 inet = (struct sock *) sock->data;
1315 #endif
1317 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1318 return NULL;
1319 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1321 xprt->file = NULL;
1322 xprt->sock = sock;
1323 xprt->inet = inet;
1324 xprt->addr = *ap;
1325 xprt->prot = proto;
1326 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1327 xprt->cwnd = RPC_INITCWND;
1328 #ifdef SOCK_HAS_USER_DATA
1329 inet->user_data = xprt;
1330 #else
1331 xprt->link = sock_list;
1332 sock_list = xprt;
1333 #endif
1334 xprt->old_data_ready = inet->data_ready;
1335 xprt->old_state_change = inet->state_change;
1336 xprt->old_write_space = inet->write_space;
1337 if (proto == IPPROTO_UDP) {
1338 inet->data_ready = udp_data_ready;
1339 } else {
1340 inet->data_ready = tcp_data_ready;
1341 inet->state_change = tcp_state_change;
1342 inet->write_space = tcp_write_space;
1343 xprt->nocong = 1;
1345 xprt->connected = 1;
1347 /* Set timeout parameters */
1348 if (to) {
1349 xprt->timeout = *to;
1350 xprt->timeout.to_current = to->to_initval;
1351 xprt->timeout.to_resrvval = to->to_maxval << 1;
1352 } else {
1353 xprt_default_timeout(&xprt->timeout, xprt->prot);
1356 xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1357 xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1358 xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1359 xprt->reconn = RPC_INIT_WAITQ("xprt_reconn");
1361 /* initialize free list */
1362 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1363 req->rq_next = req + 1;
1364 req->rq_next = NULL;
1365 xprt->free = xprt->slot;
1367 dprintk("RPC: created transport %p\n", xprt);
1370 * TCP requires the rpc I/O daemon is present
1372 if(proto==IPPROTO_TCP)
1373 rpciod_up();
1374 return xprt;
1378 * Create and initialize an RPC client given an open file.
1379 * This is obsolete now.
1381 #if 0
1382 struct rpc_xprt *
1383 xprt_create(struct file *file, struct sockaddr_in *ap, struct rpc_timeout *to)
1385 struct rpc_xprt *xprt;
1386 struct socket *sock;
1387 int proto;
1389 if (!file) {
1390 printk("RPC: file == NULL in xprt_create!\n");
1391 return NULL;
1394 sock = &file->f_inode->u.socket_i;
1395 if (sock->ops->family != PF_INET) {
1396 printk(KERN_WARNING "RPC: only INET sockets supported\n");
1397 return NULL;
1400 proto = (sock->type == SOCK_DGRAM)? IPPROTO_UDP : IPPROTO_TCP;
1401 if ((xprt = xprt_setup(sock, proto, ap, to)) != NULL) {
1402 xprt->file = file;
1403 file->f_count++;
1406 return xprt;
1408 #endif
1411 * Bind to a reserved port
1413 static inline int
1414 xprt_bindresvport(struct socket *sock)
1416 struct sockaddr_in myaddr;
1417 int err, port;
1419 memset(&myaddr, 0, sizeof(myaddr));
1420 myaddr.sin_family = AF_INET;
1421 port = 800;
1422 do {
1423 myaddr.sin_port = htons(port);
1424 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1425 sizeof(myaddr));
1426 } while (err == -EADDRINUSE && --port > 0);
1428 if (err < 0)
1429 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1431 return err;
1435 * Create a client socket given the protocol and peer address.
1437 static struct socket *
1438 xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1440 struct socket *sock;
1441 int type, err;
1443 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1444 sap? ntohl(sap->sin_addr.s_addr) : 0,
1445 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1447 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1448 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1449 printk("RPC: can't create socket (%d).\n", -err);
1450 goto failed;
1453 /* If the caller has root privs, bind to a reserved port */
1454 if (!current->fsuid && xprt_bindresvport(sock) < 0)
1455 goto failed;
1457 if (type == SOCK_STREAM && sap) {
1458 err = sock->ops->connect(sock, (struct sockaddr *) sap,
1459 sizeof(*sap), 0);
1460 if (err < 0) {
1461 printk("RPC: TCP connect failed (%d).\n", -err);
1462 goto failed;
1466 return sock;
1468 failed:
1469 sock_release(sock);
1470 return NULL;
1474 * Create an RPC client transport given the protocol and peer address.
1476 struct rpc_xprt *
1477 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1479 struct socket *sock;
1480 struct rpc_xprt *xprt;
1482 dprintk("RPC: xprt_create_proto called\n");
1484 if (!(sock = xprt_create_socket(proto, sap, to)))
1485 return NULL;
1487 if (!(xprt = xprt_setup(sock, proto, sap, to)))
1488 sock_release(sock);
1490 return xprt;
1494 * Prepare for transport shutdown.
1496 void
1497 xprt_shutdown(struct rpc_xprt *xprt)
1499 xprt->shutdown = 1;
1500 rpc_wake_up(&xprt->sending);
1501 rpc_wake_up(&xprt->pending);
1502 rpc_wake_up(&xprt->backlog);
1503 rpc_wake_up(&xprt->reconn);
1507 * Destroy an RPC transport, killing off all requests.
1510 xprt_destroy(struct rpc_xprt *xprt)
1512 #ifndef SOCK_HAS_USER_DATA
1513 struct rpc_xprt **q;
1515 for (q = &sock_list; *q && *q != xprt; q = &((*q)->link))
1517 if (!*q) {
1518 printk(KERN_WARNING "xprt_destroy: unknown socket!\n");
1519 return -EIO; /* why is there no EBUGGYSOFTWARE */
1521 *q = xprt->link;
1522 #endif
1524 dprintk("RPC: destroying transport %p\n", xprt);
1525 xprt_close(xprt);
1526 kfree(xprt);
1528 return 0;