Import 2.1.118
[davej-history.git] / net / sunrpc / xprt.c
blob4566ce5d2b7aa40332e9109cf864c97a57aee559
1 /*
2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
40 #define __KERNEL_SYSCALLS__
42 #include <linux/version.h>
43 #include <linux/types.h>
44 #include <linux/malloc.h>
45 #include <linux/sched.h>
46 #include <linux/errno.h>
47 #include <linux/socket.h>
48 #include <linux/in.h>
49 #include <linux/net.h>
50 #include <linux/mm.h>
51 #include <linux/udp.h>
52 #include <linux/unistd.h>
53 #include <linux/sunrpc/clnt.h>
54 #include <linux/file.h>
56 #include <net/sock.h>
58 #include <asm/uaccess.h>
60 #define SOCK_HAS_USER_DATA
63 * Local variables
65 #ifndef SOCK_HAS_USER_DATA
66 static struct rpc_xprt * sock_list = NULL;
67 #endif
69 #ifdef RPC_DEBUG
70 # undef RPC_DEBUG_DATA
71 # define RPCDBG_FACILITY RPCDBG_XPRT
72 #endif
74 #ifndef MAX
75 # define MAX(a, b) ((a) > (b)? (a) : (b))
76 # define MIN(a, b) ((a) < (b)? (a) : (b))
77 #endif
80 * Local functions
82 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
83 static void xprt_transmit_status(struct rpc_task *task);
84 static void xprt_receive_status(struct rpc_task *task);
85 static void xprt_reserve_status(struct rpc_task *task);
86 static void xprt_reconn_timeout(struct rpc_task *task);
87 static void xprt_reconn_status(struct rpc_task *task);
88 static struct socket *xprt_create_socket(int, struct sockaddr_in *,
89 struct rpc_timeout *);
91 #ifdef RPC_DEBUG_DATA
93 * Print the buffer contents (first 128 bytes only--just enough for
94 * diropres return).
96 static void
97 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
99 u8 *buf = (u8 *) packet;
100 int j;
102 dprintk("RPC: %s\n", msg);
103 for (j = 0; j < count && j < 128; j += 4) {
104 if (!(j & 31)) {
105 if (j)
106 dprintk("\n");
107 dprintk("0x%04x ", j);
109 dprintk("%02x%02x%02x%02x ",
110 buf[j], buf[j+1], buf[j+2], buf[j+3]);
112 dprintk("\n");
114 #else
115 static inline void
116 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
118 /* NOP */
120 #endif
123 * Look up RPC transport given an INET socket
125 static inline struct rpc_xprt *
126 xprt_from_sock(struct sock *sk)
128 #ifndef SOCK_HAS_USER_DATA
129 struct rpc_xprt *xprt;
131 for (xprt = sock_list; xprt && sk != xprt->inet; xprt = xprt->link)
133 return xprt;
134 #else
135 return (struct rpc_xprt *) sk->user_data;
136 #endif
140 * Adjust the iovec to move on 'n' bytes
143 extern inline void xprt_move_iov(struct msghdr *msg, struct iovec *niv, int amount)
145 struct iovec *iv=msg->msg_iov;
148 * Eat any sent iovecs
151 while(iv->iov_len < amount)
153 amount-=iv->iov_len;
154 iv++;
155 msg->msg_iovlen--;
158 msg->msg_iov=niv;
161 * And chew down the partial one
164 niv[0].iov_len = iv->iov_len-amount;
165 niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
166 iv++;
169 * And copy any others
172 for(amount=1;amount<msg->msg_iovlen; amount++)
174 niv[amount]=*iv++;
179 * Write data to socket.
182 static inline int
183 xprt_sendmsg(struct rpc_xprt *xprt)
185 struct socket *sock = xprt->sock;
186 struct msghdr msg;
187 mm_segment_t oldfs;
188 int result;
189 struct iovec niv[MAX_IOVEC];
191 xprt_pktdump("packet data:",
192 xprt->snd_buf.io_vec->iov_base,
193 xprt->snd_buf.io_vec->iov_len);
195 msg.msg_flags = MSG_DONTWAIT;
196 msg.msg_iov = xprt->snd_buf.io_vec;
197 msg.msg_iovlen = xprt->snd_buf.io_nr;
198 msg.msg_name = (struct sockaddr *) &xprt->addr;
199 msg.msg_namelen = sizeof(xprt->addr);
200 msg.msg_control = NULL;
202 /* Dont repeat bytes */
204 if(xprt->snd_sent)
205 xprt_move_iov(&msg, niv, xprt->snd_sent);
207 oldfs = get_fs(); set_fs(get_ds());
208 result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
209 set_fs(oldfs);
211 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
212 xprt->snd_buf.io_len, result);
214 if (result >= 0) {
215 xprt->snd_buf.io_len -= result;
216 xprt->snd_sent += result;
217 return result;
220 switch (result) {
221 case -ECONNREFUSED:
222 /* When the server has died, an ICMP port unreachable message
223 * prompts ECONNREFUSED.
225 break;
226 case -EAGAIN:
227 return 0;
228 case -ENOTCONN: case -EPIPE:
229 /* connection broken */
230 break;
231 default:
232 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
233 result = 0;
235 return result;
239 * Read data from socket
241 static inline int
242 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
244 struct socket *sock = xprt->sock;
245 struct sockaddr_in sin;
246 struct msghdr msg;
247 mm_segment_t oldfs;
248 int result;
250 #if LINUX_VERSION_CODE >= 0x020100
251 msg.msg_flags = MSG_DONTWAIT;
252 msg.msg_iov = iov;
253 msg.msg_iovlen = nr;
254 msg.msg_name = &sin;
255 msg.msg_namelen = sizeof(sin);
256 msg.msg_control = NULL;
258 oldfs = get_fs(); set_fs(get_ds());
259 result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
260 set_fs(oldfs);
261 #else
262 int alen = sizeof(sin);
263 msg.msg_flags = 0;
264 msg.msg_iov = iov;
265 msg.msg_iovlen = nr;
266 msg.msg_name = &sin;
267 msg.msg_namelen = sizeof(sin);
268 msg.msg_control = NULL;
270 oldfs = get_fs(); set_fs(get_ds());
271 result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
272 set_fs(oldfs);
273 #endif
275 if (!result && len)
276 result = -EAGAIN;
278 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
279 iov, len, result);
280 return result;
285 * Adjust RPC congestion window
286 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
288 static void
289 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
291 unsigned long cwnd = xprt->cwnd;
293 if (xprt->nocong)
294 return;
295 if (result >= 0) {
296 if (xprt->cong < cwnd || jiffies < xprt->congtime)
297 return;
298 /* The (cwnd >> 1) term makes sure
299 * the result gets rounded properly. */
300 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
301 if (cwnd > RPC_MAXCWND)
302 cwnd = RPC_MAXCWND;
303 else
304 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
305 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
306 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
307 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
308 (xprt->congtime-jiffies)*1000/HZ);
309 } else if (result == -ETIMEDOUT) {
310 if ((cwnd >>= 1) < RPC_CWNDSCALE)
311 cwnd = RPC_CWNDSCALE;
312 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
313 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
314 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
315 (xprt->congtime-jiffies)*1000/HZ);
316 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
319 xprt->cwnd = cwnd;
323 * Adjust timeout values etc for next retransmit
326 xprt_adjust_timeout(struct rpc_timeout *to)
328 if (to->to_exponential)
329 to->to_current <<= 1;
330 else
331 to->to_current += to->to_increment;
332 if (to->to_maxval && to->to_current >= to->to_maxval) {
333 to->to_current = to->to_maxval;
334 to->to_retries = 0;
336 if (!to->to_current) {
337 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
338 to->to_current = 5 * HZ;
340 pprintk("RPC: %lu %s\n", jiffies,
341 to->to_retries? "retrans" : "timeout");
342 return (to->to_retries)--;
346 * Close down a transport socket
348 static void
349 xprt_close(struct rpc_xprt *xprt)
351 struct sock *sk = xprt->inet;
353 #ifdef SOCK_HAS_USER_DATA
354 sk->user_data = NULL;
355 #endif
356 sk->data_ready = xprt->old_data_ready;
357 sk->state_change = xprt->old_state_change;
358 sk->write_space = xprt->old_write_space;
360 if (xprt->file)
361 fput(xprt->file);
362 else
363 sock_release(xprt->sock);
365 * TCP doesnt require the rpciod now - other things may
366 * but rpciod handles that not us.
368 if(xprt->stream)
369 rpciod_down();
373 * Mark a transport as disconnected
375 static void
376 xprt_disconnect(struct rpc_xprt *xprt)
378 dprintk("RPC: disconnected transport %p\n", xprt);
379 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
380 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
381 xprt->connected = 0;
385 * Reconnect a broken TCP connection.
387 void
388 xprt_reconnect(struct rpc_task *task)
390 struct rpc_xprt *xprt = task->tk_xprt;
391 struct socket *sock;
392 struct sock *inet;
393 int status;
395 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
396 task->tk_pid, xprt, xprt->connected);
397 task->tk_status = 0;
399 if (xprt->connecting) {
400 task->tk_timeout = xprt->timeout.to_maxval;
401 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
402 return;
404 xprt->connecting = 1;
406 /* Create an unconnected socket */
407 if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
408 goto defer;
410 #if LINUX_VERSION_CODE >= 0x020100
411 inet = sock->sk;
412 #else
413 inet = (struct sock *) sock->data;
414 #endif
415 inet->data_ready = xprt->inet->data_ready;
416 inet->state_change = xprt->inet->state_change;
417 inet->write_space = xprt->inet->write_space;
418 #ifdef SOCK_HAS_USER_DATA
419 inet->user_data = xprt;
420 #endif
422 dprintk("RPC: %4d closing old socket\n", task->tk_pid);
423 xprt_disconnect(xprt);
424 xprt_close(xprt);
426 /* Reset to new socket and default congestion */
427 xprt->sock = sock;
428 xprt->inet = inet;
429 xprt->cwnd = RPC_INITCWND;
431 /* Now connect it asynchronously. */
432 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
433 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
434 sizeof(xprt->addr), O_NONBLOCK);
435 if (status < 0) {
436 if (status != -EINPROGRESS && status != -EALREADY) {
437 printk("RPC: TCP connect error %d!\n", -status);
438 goto defer;
441 dprintk("RPC: %4d connect status %d connected %d\n",
442 task->tk_pid, status, xprt->connected);
443 task->tk_timeout = 60 * HZ;
445 start_bh_atomic();
446 if (!xprt->connected) {
447 rpc_sleep_on(&xprt->reconn, task,
448 xprt_reconn_status, xprt_reconn_timeout);
449 end_bh_atomic();
450 return;
452 end_bh_atomic();
455 xprt->connecting = 0;
456 rpc_wake_up(&xprt->reconn);
457 return;
459 defer:
460 task->tk_timeout = 30 * HZ;
461 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
462 xprt->connecting = 0;
466 * Reconnect status
468 static void
469 xprt_reconn_status(struct rpc_task *task)
471 struct rpc_xprt *xprt = task->tk_xprt;
473 dprintk("RPC: %4d xprt_reconn_status %d\n",
474 task->tk_pid, task->tk_status);
475 if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
476 task->tk_timeout = 30 * HZ;
477 rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
482 * Reconnect timeout. We just mark the transport as not being in the
483 * process of reconnecting, and leave the rest to the upper layers.
485 static void
486 xprt_reconn_timeout(struct rpc_task *task)
488 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
489 task->tk_pid, task->tk_status);
490 task->tk_status = -ENOTCONN;
491 task->tk_xprt->connecting = 0;
492 task->tk_timeout = 0;
493 rpc_wake_up_task(task);
497 * Look up the RPC request corresponding to a reply.
499 static inline struct rpc_rqst *
500 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
502 struct rpc_task *head, *task;
503 struct rpc_rqst *req;
504 int safe = 0;
506 if ((head = xprt->pending.task) != NULL) {
507 task = head;
508 do {
509 if ((req = task->tk_rqstp) && req->rq_xid == xid)
510 return req;
511 task = task->tk_next;
512 if (++safe > 100) {
513 printk("xprt_lookup_rqst: loop in Q!\n");
514 return NULL;
516 } while (task != head);
518 dprintk("RPC: unknown XID %08x in reply.\n", xid);
519 return NULL;
523 * Complete reply received.
524 * The TCP code relies on us to remove the request from xprt->pending.
526 static inline void
527 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
529 struct rpc_task *task = req->rq_task;
531 req->rq_rlen = copied;
532 req->rq_gotit = 1;
534 /* Adjust congestion window */
535 xprt_adjust_cwnd(xprt, copied);
537 #ifdef RPC_PROFILE
538 /* Profile only reads for now */
539 if (copied > 1024) {
540 static unsigned long nextstat = 0;
541 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
543 pkt_cnt++;
544 pkt_len += req->rq_slen + copied;
545 pkt_rtt += jiffies - req->rq_xtime;
546 if (nextstat < jiffies) {
547 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
548 printk("RPC: %ld %ld %ld %ld stat\n",
549 jiffies, pkt_cnt, pkt_len, pkt_rtt);
550 pkt_rtt = pkt_len = pkt_cnt = 0;
551 nextstat = jiffies + 5 * HZ;
554 #endif
556 /* ... and wake up the process. */
557 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
558 task->tk_status = copied;
560 rpc_wake_up_task(task);
561 return;
565 * Input handler for RPC replies. Called from a bottom half and hence
566 * atomic.
568 static inline void
569 udp_data_ready(struct sock *sk, int len)
571 struct rpc_task *task;
572 struct rpc_xprt *xprt;
573 struct rpc_rqst *rovr;
574 struct sk_buff *skb;
575 struct iovec iov[MAX_IOVEC];
576 mm_segment_t oldfs;
577 int err, repsize, copied;
579 dprintk("RPC: udp_data_ready...\n");
580 if (!(xprt = xprt_from_sock(sk)))
581 return;
582 dprintk("RPC: udp_data_ready client %p\n", xprt);
584 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
585 return;
586 repsize = skb->len - 8; /* don't account for UDP header */
588 if (repsize < 4) {
589 printk("RPC: impossible RPC reply size %d!\n", repsize);
590 goto dropit;
593 /* Look up the request corresponding to the given XID */
594 if (!(rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + 8))))
595 goto dropit;
596 task = rovr->rq_task;
598 dprintk("RPC: %4d received reply\n", task->tk_pid);
599 xprt_pktdump("packet data:", (u32 *) (skb->h.raw+8), repsize);
601 if ((copied = rovr->rq_rlen) > repsize)
602 copied = repsize;
604 /* Okay, we have it. Copy datagram... */
605 memcpy(iov, rovr->rq_rvec, rovr->rq_rnr * sizeof(iov[0]));
606 oldfs = get_fs(); set_fs(get_ds());
607 skb_copy_datagram_iovec(skb, 8, iov, copied);
608 set_fs(oldfs);
610 xprt_complete_rqst(xprt, rovr, copied);
612 dropit:
613 skb_free_datagram(sk, skb);
614 return;
618 * TCP record receive routine
619 * This is not the most efficient code since we call recvfrom twice--
620 * first receiving the record marker and XID, then the data.
622 * The optimal solution would be a RPC support in the TCP layer, which
623 * would gather all data up to the next record marker and then pass us
624 * the list of all TCP segments ready to be copied.
626 static inline int
627 tcp_input_record(struct rpc_xprt *xprt)
629 struct rpc_rqst *req;
630 struct iovec *iov;
631 struct iovec riov;
632 u32 offset;
633 int result, maxcpy, reclen, avail, want;
635 dprintk("RPC: tcp_input_record\n");
636 offset = xprt->tcp_offset;
637 result = -EAGAIN;
638 if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
639 want = (xprt->tcp_more? 4 : 8) - offset;
640 dprintk("RPC: reading header (%d bytes)\n", want);
641 riov.iov_base = xprt->tcp_recm.data + offset;
642 riov.iov_len = want;
643 result = xprt_recvmsg(xprt, &riov, 1, want);
644 if (result < 0)
645 goto done;
646 offset += result;
647 if (result < want) {
648 result = -EAGAIN;
649 goto done;
652 /* Get the record length and mask out the more_fragments bit */
653 reclen = ntohl(xprt->tcp_reclen);
654 dprintk("RPC: reclen %08x\n", reclen);
655 xprt->tcp_more = (reclen & 0x80000000)? 0 : 1;
656 if (!(reclen &= 0x7fffffff)) {
657 printk(KERN_NOTICE "RPC: empty TCP record.\n");
658 return -ENOTCONN; /* break connection */
660 xprt->tcp_total += reclen;
661 xprt->tcp_reclen = reclen;
663 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
664 xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more);
665 if (!xprt->tcp_copied
666 && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) {
667 iov = xprt->tcp_iovec;
668 memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
669 #if 0
670 *(u32 *)iov->iov_base = req->rq_xid;
671 #endif
672 iov->iov_base += 4;
673 iov->iov_len -= 4;
674 xprt->tcp_copied = 4;
675 xprt->tcp_rqstp = req;
677 } else {
678 reclen = xprt->tcp_reclen;
681 avail = reclen - (offset - 4);
682 if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid
683 && req->rq_task->tk_rpcwait == &xprt->pending) {
684 want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
686 dprintk("RPC: %4d TCP receiving %d bytes\n",
687 req->rq_task->tk_pid, want);
688 result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
689 if (result < 0)
690 goto done;
691 xprt->tcp_copied += result;
692 offset += result;
693 avail -= result;
694 if (result < want) {
695 result = -EAGAIN;
696 goto done;
699 maxcpy = MIN(req->rq_rlen, xprt->tcp_total);
700 if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) {
701 dprintk("RPC: %4d received reply complete\n",
702 req->rq_task->tk_pid);
703 xprt_complete_rqst(xprt, req, xprt->tcp_total);
704 xprt->tcp_copied = 0;
705 xprt->tcp_rqstp = NULL;
707 /* Request must be re-encoded before retransmit */
708 req->rq_damaged = 1;
711 /* Skip over any trailing bytes on short reads */
712 while (avail) {
713 static u8 dummy[64];
715 want = MIN(avail, sizeof(dummy));
716 riov.iov_base = dummy;
717 riov.iov_len = want;
718 dprintk("RPC: TCP skipping %d bytes\n", want);
719 result = xprt_recvmsg(xprt, &riov, 1, want);
720 if (result < 0)
721 goto done;
722 offset += result;
723 avail -= result;
724 if (result < want) {
725 result = -EAGAIN;
726 goto done;
729 if (!xprt->tcp_more)
730 xprt->tcp_total = 0;
731 offset = 0;
733 done:
734 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
735 offset, xprt->tcp_total, xprt->tcp_copied);
736 xprt->tcp_offset = offset;
737 return result;
741 * TCP task queue stuff
744 static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
747 * This is protected from tcp_data_ready and the stack as its run
748 * inside of the RPC I/O daemon
751 void rpciod_tcp_dispatcher(void)
753 struct rpc_xprt *xprt;
754 int result;
756 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
759 * Empty each pending socket
762 while((xprt=rpc_xprt_pending)!=NULL)
764 int safe_retry=0;
766 rpc_xprt_pending=xprt->rx_pending;
767 xprt->rx_pending_flag=0;
769 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
773 if (safe_retry++ > 50)
774 break;
775 result = tcp_input_record(xprt);
777 while (result >= 0);
779 switch (result) {
780 case -EAGAIN:
781 continue;
782 case -ENOTCONN:
783 case -EPIPE:
784 xprt_disconnect(xprt);
785 continue;
786 default:
787 printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
788 result);
794 extern inline void tcp_rpciod_queue(void)
796 rpciod_wake_up();
800 * data_ready callback for TCP. We can't just jump into the
801 * tcp recvmsg functions inside of the network receive bh or
802 * bad things occur. We queue it to pick up after networking
803 * is done.
806 static void tcp_data_ready(struct sock *sk, int len)
808 struct rpc_xprt *xprt;
810 dprintk("RPC: tcp_data_ready...\n");
811 if (!(xprt = xprt_from_sock(sk)))
813 printk("Not a socket with xprt %p\n", sk);
814 return;
816 dprintk("RPC: tcp_data_ready client %p\n", xprt);
817 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
818 sk->state, xprt->connected,
819 sk->dead, sk->zapped);
821 * If we are not waiting for the RPC bh run then
822 * we are now
824 if (!xprt->rx_pending_flag)
826 dprintk("RPC: xprt queue\n");
827 if(rpc_xprt_pending==NULL)
828 tcp_rpciod_queue();
829 xprt->rx_pending_flag=1;
830 xprt->rx_pending=rpc_xprt_pending;
831 rpc_xprt_pending=xprt;
833 else
834 dprintk("RPC: xprt queued already %p\n", xprt);
838 static void
839 tcp_state_change(struct sock *sk)
841 struct rpc_xprt *xprt;
843 if (!(xprt = xprt_from_sock(sk)))
844 return;
845 dprintk("RPC: tcp_state_change client %p...\n", xprt);
846 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
847 sk->state, xprt->connected,
848 sk->dead, sk->zapped);
850 if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
851 xprt->connected = 1;
852 xprt->connecting = 0;
853 rpc_wake_up(&xprt->reconn);
854 } else if (sk->zapped) {
855 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
856 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
857 rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
861 static void
862 tcp_write_space(struct sock *sk)
864 struct rpc_xprt *xprt;
866 if (!(xprt = xprt_from_sock(sk)))
867 return;
868 if(xprt->snd_sent && xprt->snd_task)
869 printk("write space\n");
870 if(xprt->write_space == 0)
872 xprt->write_space = 1;
873 if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
875 if(xprt->snd_sent)
876 printk("Write wakeup snd_sent =%d\n",
877 xprt->snd_sent);
878 rpc_wake_up_task(xprt->snd_task);
884 * RPC receive timeout handler.
886 static void
887 xprt_timer(struct rpc_task *task)
889 if (task->tk_rqstp)
890 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
892 dprintk("RPC: %4d xprt_timer (%s request)\n", task->tk_pid,
893 task->tk_rqstp? "pending" : "backlogged");
895 task->tk_status = -ETIMEDOUT;
896 task->tk_timeout = 0;
897 rpc_wake_up_task(task);
901 * (Partly) transmit the RPC packet
902 * Note that task->tk_status is either 0 or negative on return.
903 * Only when the reply is received will the status be set to a
904 * positive value.
906 static inline int
907 xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
909 struct rpc_rqst *req = task->tk_rqstp;
910 int result;
912 task->tk_status = 0;
913 if ((result = xprt_sendmsg(xprt)) >= 0) {
914 if (!xprt->snd_buf.io_len || !xprt->stream) {
915 rpc_wake_up_next(&xprt->sending);
916 return req->rq_slen;
918 result = -EAGAIN;
919 } else if (xprt->stream) {
920 if (result == -ENOTCONN || result == -EPIPE) {
921 xprt_disconnect(xprt);
922 result = -ENOTCONN;
925 return task->tk_status = result;
929 * Place the actual RPC call.
930 * We have to copy the iovec because sendmsg fiddles with its contents.
932 void
933 xprt_transmit(struct rpc_task *task)
935 struct rpc_timeout *timeo;
936 struct rpc_rqst *req = task->tk_rqstp;
937 struct rpc_xprt *xprt = req->rq_xprt;
939 /*DEBUG*/int ac_debug=xprt->snd_sent;
941 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
942 *(u32 *)(req->rq_svec[0].iov_base));
944 if (xprt->shutdown) {
945 task->tk_status = -EIO;
946 return;
949 /* If we're not already in the process of transmitting our call,
950 * set up everything as needed. */
951 if (xprt->snd_task != task) {
952 /* Write the record marker */
953 if (xprt->stream) {
954 u32 marker;
956 if (!xprt->connected) {
957 task->tk_status = -ENOTCONN;
958 return;
960 marker = htonl(0x80000000|(req->rq_slen-4));
961 *((u32 *) req->rq_svec[0].iov_base) = marker;
964 /* Reset timeout parameters */
965 timeo = &req->rq_timeout;
966 if (timeo->to_retries < 0) {
967 dprintk("RPC: %4d xprt_transmit reset timeo\n",
968 task->tk_pid);
969 timeo->to_retries = xprt->timeout.to_retries;
970 timeo->to_current = timeo->to_initval;
973 #ifdef RPC_PROFILE
974 req->rq_xtime = jiffies;
975 #endif
976 req->rq_gotit = 0;
978 if (xprt->snd_task) {
979 dprintk("RPC: %4d TCP write queue full (task %d)\n",
980 task->tk_pid, xprt->snd_task->tk_pid);
981 rpc_sleep_on(&xprt->sending, task,
982 xprt_transmit_status, NULL);
983 return;
985 xprt->snd_buf = req->rq_snd_buf;
986 xprt->snd_task = task;
987 xprt->snd_sent = 0;
988 /*DEBUG*/ac_debug = 0;
991 /* For fast networks/servers we have to put the request on
992 * the pending list now:
994 start_bh_atomic();
995 rpc_add_wait_queue(&xprt->pending, task);
996 task->tk_callback = NULL;
997 end_bh_atomic();
999 /* Continue transmitting the packet/record. We must be careful
1000 * to cope with writespace callbacks arriving _after_ we have
1001 * called xprt_sendmsg().
1003 while (1) {
1004 xprt->write_space = 0;
1005 if (xprt_transmit_some(xprt, task) != -EAGAIN) {
1006 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
1007 xprt->snd_task = NULL;
1008 if(ac_debug)
1009 printk("Partial xmit finished\n");
1010 return;
1013 /*d*/printk("RPC: %4d xmit incomplete (%d left of %d)\n",
1014 task->tk_pid, xprt->snd_buf.io_len,
1015 req->rq_slen);
1016 task->tk_status = 0;
1017 start_bh_atomic();
1018 if (!xprt->write_space) {
1019 /* Remove from pending */
1020 rpc_remove_wait_queue(task);
1021 rpc_sleep_on(&xprt->sending, task,
1022 xprt_transmit_status, NULL);
1023 end_bh_atomic();
1024 return;
1026 end_bh_atomic();
1031 * This callback is invoked when the sending task is forced to sleep
1032 * because the TCP write buffers are full
1034 static void
1035 xprt_transmit_status(struct rpc_task *task)
1037 struct rpc_xprt *xprt = task->tk_client->cl_xprt;
1039 dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
1040 if (xprt->snd_task == task)
1042 if (task->tk_status < 0)
1044 xprt->snd_task = NULL;
1045 xprt_disconnect(xprt);
1047 else
1048 xprt_transmit(task);
1053 * Wait for the reply to our call.
1054 * When the callback is invoked, the congestion window should have
1055 * been updated already.
1057 void
1058 xprt_receive(struct rpc_task *task)
1060 struct rpc_rqst *req = task->tk_rqstp;
1061 struct rpc_xprt *xprt = req->rq_xprt;
1063 dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1064 if (xprt->connected == 0) {
1065 task->tk_status = -ENOTCONN;
1066 return;
1070 * Wait until rq_gotit goes non-null, or timeout elapsed.
1072 task->tk_timeout = req->rq_timeout.to_current;
1074 start_bh_atomic();
1075 if (!req->rq_gotit) {
1076 rpc_sleep_on(&xprt->pending, task,
1077 xprt_receive_status, xprt_timer);
1079 end_bh_atomic();
1081 dprintk("RPC: %4d xprt_receive returns %d\n",
1082 task->tk_pid, task->tk_status);
1085 static void
1086 xprt_receive_status(struct rpc_task *task)
1088 struct rpc_xprt *xprt = task->tk_xprt;
1090 if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
1091 xprt->tcp_rqstp = NULL;
1095 * Reserve an RPC call slot.
1098 xprt_reserve(struct rpc_task *task)
1100 struct rpc_xprt *xprt = task->tk_xprt;
1102 /* We already have an initialized request. */
1103 if (task->tk_rqstp)
1104 return 0;
1106 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1107 task->tk_pid, xprt->cong, xprt->cwnd);
1108 if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
1109 xprt_reserve_status(task);
1110 task->tk_timeout = 0;
1111 } else if (!task->tk_timeout) {
1112 task->tk_status = -ENOBUFS;
1113 } else {
1114 dprintk("RPC: xprt_reserve waiting on backlog\n");
1115 rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL);
1117 dprintk("RPC: %4d xprt_reserve returns %d\n",
1118 task->tk_pid, task->tk_status);
1119 return task->tk_status;
1123 * Reservation callback
1125 static void
1126 xprt_reserve_status(struct rpc_task *task)
1128 struct rpc_xprt *xprt = task->tk_xprt;
1129 struct rpc_rqst *req;
1131 if (xprt->shutdown) {
1132 task->tk_status = -EIO;
1133 } else if (task->tk_status < 0) {
1134 /* NOP */
1135 } else if (task->tk_rqstp) {
1136 /* We've already been given a request slot: NOP */
1137 } else if (!RPCXPRT_CONGESTED(xprt)) {
1138 /* OK: There's room for us. Grab a free slot and bump
1139 * congestion value */
1140 req = xprt->free;
1141 if (!req)
1142 goto bad_list;
1143 if (req->rq_xid)
1144 goto bad_used;
1145 xprt->free = req->rq_next;
1146 xprt->cong += RPC_CWNDSCALE;
1147 task->tk_rqstp = req;
1148 req->rq_next = NULL;
1149 xprt_request_init(task, xprt);
1150 } else {
1151 task->tk_status = -EAGAIN;
1154 if (xprt->free && !RPCXPRT_CONGESTED(xprt))
1155 rpc_wake_up_next(&xprt->backlog);
1157 return;
1159 bad_list:
1160 printk("RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1161 task->tk_pid, xprt->cong, xprt->cwnd);
1162 rpc_debug = ~0;
1163 goto bummer;
1164 bad_used:
1165 printk("RPC: used rqst slot %p on free list!\n", req);
1166 bummer:
1167 task->tk_status = -EIO;
1168 xprt->free = NULL;
1169 return;
1173 * Initialize RPC request
1175 static void
1176 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1178 struct rpc_rqst *req = task->tk_rqstp;
1179 static u32 xid = 0;
1181 if (!xid)
1182 xid = jiffies;
1184 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1185 task->tk_status = 0;
1186 req->rq_gotit = 0;
1187 req->rq_timeout = xprt->timeout;
1188 req->rq_task = task;
1189 req->rq_xprt = xprt;
1190 req->rq_xid = xid++;
1194 * Release an RPC call slot
1196 void
1197 xprt_release(struct rpc_task *task)
1199 struct rpc_xprt *xprt = task->tk_xprt;
1200 struct rpc_rqst *req;
1202 if (!(req = task->tk_rqstp))
1203 return;
1204 task->tk_rqstp = NULL;
1205 memset(req, 0, sizeof(*req)); /* mark unused */
1207 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1209 /* remove slot from queue of pending */
1210 start_bh_atomic();
1211 if (task->tk_rpcwait) {
1212 printk("RPC: task of released request still queued!\n");
1213 #ifdef RPC_DEBUG
1214 printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
1215 #endif
1216 rpc_del_timer(task);
1217 rpc_remove_wait_queue(task);
1219 end_bh_atomic();
1221 /* Decrease congestion value. If congestion threshold is not yet
1222 * reached, pass on the request slot.
1223 * This looks kind of kludgy, but it guarantees backlogged requests
1224 * are served in order.
1226 xprt->cong -= RPC_CWNDSCALE;
1227 if (!RPCXPRT_CONGESTED(xprt)) {
1228 struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
1230 if (next && next->tk_rqstp == 0) {
1231 xprt->cong += RPC_CWNDSCALE;
1232 next->tk_rqstp = req;
1233 xprt_request_init(next, xprt);
1234 return;
1238 req->rq_next = xprt->free;
1239 xprt->free = req;
1243 * Set default timeout parameters
1245 void
1246 xprt_default_timeout(struct rpc_timeout *to, int proto)
1248 if (proto == IPPROTO_UDP)
1249 xprt_set_timeout(to, 5, 5 * HZ);
1250 else
1251 xprt_set_timeout(to, 5, 15 * HZ);
1255 * Set constant timeout
1257 void
1258 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1260 to->to_current =
1261 to->to_initval =
1262 to->to_increment = incr;
1263 to->to_maxval = incr * retr;
1264 to->to_resrvval = incr * retr;
1265 to->to_retries = retr;
1266 to->to_exponential = 0;
1270 * Initialize an RPC client
1272 static struct rpc_xprt *
1273 xprt_setup(struct socket *sock, int proto,
1274 struct sockaddr_in *ap, struct rpc_timeout *to)
1276 struct rpc_xprt *xprt;
1277 struct rpc_rqst *req;
1278 struct sock *inet;
1279 int i;
1281 dprintk("RPC: setting up %s transport...\n",
1282 proto == IPPROTO_UDP? "UDP" : "TCP");
1284 #if LINUX_VERSION_CODE >= 0x020100
1285 inet = sock->sk;
1286 #else
1287 inet = (struct sock *) sock->data;
1288 #endif
1290 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1291 return NULL;
1292 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1294 xprt->file = NULL;
1295 xprt->sock = sock;
1296 xprt->inet = inet;
1297 xprt->addr = *ap;
1298 xprt->prot = proto;
1299 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1300 xprt->cwnd = RPC_INITCWND;
1301 #ifdef SOCK_HAS_USER_DATA
1302 inet->user_data = xprt;
1303 #else
1304 xprt->link = sock_list;
1305 sock_list = xprt;
1306 #endif
1307 xprt->old_data_ready = inet->data_ready;
1308 xprt->old_state_change = inet->state_change;
1309 xprt->old_write_space = inet->write_space;
1310 if (proto == IPPROTO_UDP) {
1311 inet->data_ready = udp_data_ready;
1312 } else {
1313 inet->data_ready = tcp_data_ready;
1314 inet->state_change = tcp_state_change;
1315 inet->write_space = tcp_write_space;
1316 xprt->nocong = 1;
1318 xprt->connected = 1;
1320 /* Set timeout parameters */
1321 if (to) {
1322 xprt->timeout = *to;
1323 xprt->timeout.to_current = to->to_initval;
1324 xprt->timeout.to_resrvval = to->to_maxval << 1;
1325 } else {
1326 xprt_default_timeout(&xprt->timeout, xprt->prot);
1329 xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1330 xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1331 xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1332 xprt->reconn = RPC_INIT_WAITQ("xprt_reconn");
1334 /* initialize free list */
1335 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1336 req->rq_next = req + 1;
1337 req->rq_next = NULL;
1338 xprt->free = xprt->slot;
1340 dprintk("RPC: created transport %p\n", xprt);
1343 * TCP requires the rpc I/O daemon is present
1345 if(proto==IPPROTO_TCP)
1346 rpciod_up();
1347 return xprt;
1351 * Create and initialize an RPC client given an open file.
1352 * This is obsolete now.
1354 #if 0
1355 struct rpc_xprt *
1356 xprt_create(struct file *file, struct sockaddr_in *ap, struct rpc_timeout *to)
1358 struct rpc_xprt *xprt;
1359 struct socket *sock;
1360 int proto;
1362 if (!file) {
1363 printk("RPC: file == NULL in xprt_create!\n");
1364 return NULL;
1367 sock = &file->f_inode->u.socket_i;
1368 if (sock->ops->family != PF_INET) {
1369 printk(KERN_WARNING "RPC: only INET sockets supported\n");
1370 return NULL;
1373 proto = (sock->type == SOCK_DGRAM)? IPPROTO_UDP : IPPROTO_TCP;
1374 if ((xprt = xprt_setup(sock, proto, ap, to)) != NULL) {
1375 xprt->file = file;
1376 file->f_count++;
1379 return xprt;
1381 #endif
1384 * Bind to a reserved port
1386 static inline int
1387 xprt_bindresvport(struct socket *sock)
1389 struct sockaddr_in myaddr;
1390 int err, port;
1392 memset(&myaddr, 0, sizeof(myaddr));
1393 myaddr.sin_family = AF_INET;
1394 port = 800;
1395 do {
1396 myaddr.sin_port = htons(port);
1397 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1398 sizeof(myaddr));
1399 } while (err == -EADDRINUSE && --port > 0);
1401 if (err < 0)
1402 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1404 return err;
1408 * Create a client socket given the protocol and peer address.
1410 static struct socket *
1411 xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1413 struct socket *sock;
1414 int type, err;
1416 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1417 sap? ntohl(sap->sin_addr.s_addr) : 0,
1418 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1420 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1421 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1422 printk("RPC: can't create socket (%d).\n", -err);
1423 goto failed;
1426 /* If the caller has root privs, bind to a reserved port */
1427 if (!current->fsuid && xprt_bindresvport(sock) < 0)
1428 goto failed;
1430 if (type == SOCK_STREAM && sap) {
1431 err = sock->ops->connect(sock, (struct sockaddr *) sap,
1432 sizeof(*sap), 0);
1433 if (err < 0) {
1434 printk("RPC: TCP connect failed (%d).\n", -err);
1435 goto failed;
1439 return sock;
1441 failed:
1442 sock_release(sock);
1443 return NULL;
1447 * Create an RPC client transport given the protocol and peer address.
1449 struct rpc_xprt *
1450 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1452 struct socket *sock;
1453 struct rpc_xprt *xprt;
1455 dprintk("RPC: xprt_create_proto called\n");
1457 if (!(sock = xprt_create_socket(proto, sap, to)))
1458 return NULL;
1460 if (!(xprt = xprt_setup(sock, proto, sap, to)))
1461 sock_release(sock);
1463 return xprt;
1467 * Prepare for transport shutdown.
1469 void
1470 xprt_shutdown(struct rpc_xprt *xprt)
1472 xprt->shutdown = 1;
1473 rpc_wake_up(&xprt->sending);
1474 rpc_wake_up(&xprt->pending);
1475 rpc_wake_up(&xprt->backlog);
1476 rpc_wake_up(&xprt->reconn);
1480 * Destroy an RPC transport, killing off all requests.
1483 xprt_destroy(struct rpc_xprt *xprt)
1485 #ifndef SOCK_HAS_USER_DATA
1486 struct rpc_xprt **q;
1488 for (q = &sock_list; *q && *q != xprt; q = &((*q)->link))
1490 if (!*q) {
1491 printk(KERN_WARNING "xprt_destroy: unknown socket!\n");
1492 return -EIO; /* why is there no EBUGGYSOFTWARE */
1494 *q = xprt->link;
1495 #endif
1497 dprintk("RPC: destroying transport %p\n", xprt);
1498 xprt_close(xprt);
1499 kfree(xprt);
1501 return 0;