Import 2.1.116pre2
[davej-history.git] / net / sunrpc / xprt.c
blobe2af81be4f9ebdb25f91ec59fb5b6840e9befa20
1 /*
2 * linux/net/sunrpc/xprt.c
4 * This is a generic RPC call interface supporting congestion avoidance,
5 * and asynchronous calls.
7 * The interface works like this:
9 * - When a process places a call, it allocates a request slot if
10 * one is available. Otherwise, it sleeps on the backlog queue
11 * (xprt_reserve).
12 * - Next, the caller puts together the RPC message, stuffs it into
13 * the request struct, and calls xprt_call().
14 * - xprt_call transmits the message and installs the caller on the
15 * socket's wait list. At the same time, it installs a timer that
16 * is run after the packet's timeout has expired.
17 * - When a packet arrives, the data_ready handler walks the list of
18 * pending requests for that socket. If a matching XID is found, the
19 * caller is woken up, and the timer removed.
20 * - When no reply arrives within the timeout interval, the timer is
21 * fired by the kernel and runs xprt_timer(). It either adjusts the
22 * timeout values (minor timeout) or wakes up the caller with a status
23 * of -ETIMEDOUT.
24 * - When the caller receives a notification from RPC that a reply arrived,
25 * it should release the RPC slot, and process the reply.
26 * If the call timed out, it may choose to retry the operation by
27 * adjusting the initial timeout value, and simply calling rpc_call
28 * again.
30 * Support for async RPC is done through a set of RPC-specific scheduling
31 * primitives that `transparently' work for processes as well as async
32 * tasks that rely on callbacks.
34 * Copyright (C) 1995, 1996, Olaf Kirch <okir@monad.swb.de>
36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
39 #define __KERNEL_SYSCALLS__
41 #include <linux/version.h>
42 #include <linux/types.h>
43 #include <linux/malloc.h>
44 #include <linux/sched.h>
45 #include <linux/errno.h>
46 #include <linux/socket.h>
47 #include <linux/in.h>
48 #include <linux/net.h>
49 #include <linux/mm.h>
50 #include <linux/udp.h>
51 #include <linux/unistd.h>
52 #include <linux/sunrpc/clnt.h>
53 #include <linux/file.h>
55 #include <net/sock.h>
57 #include <asm/uaccess.h>
59 #define SOCK_HAS_USER_DATA
62 * Local variables
64 #ifndef SOCK_HAS_USER_DATA
65 static struct rpc_xprt * sock_list = NULL;
66 #endif
68 #ifdef RPC_DEBUG
69 # undef RPC_DEBUG_DATA
70 # define RPCDBG_FACILITY RPCDBG_XPRT
71 #endif
73 #ifndef MAX
74 # define MAX(a, b) ((a) > (b)? (a) : (b))
75 # define MIN(a, b) ((a) < (b)? (a) : (b))
76 #endif
79 * Local functions
81 static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
82 static void xprt_transmit_status(struct rpc_task *task);
83 static void xprt_receive_status(struct rpc_task *task);
84 static void xprt_reserve_status(struct rpc_task *task);
85 static void xprt_reconn_timeout(struct rpc_task *task);
86 static void xprt_reconn_status(struct rpc_task *task);
87 static struct socket *xprt_create_socket(int, struct sockaddr_in *,
88 struct rpc_timeout *);
90 #ifdef RPC_DEBUG_DATA
92 * Print the buffer contents (first 128 bytes only--just enough for
93 * diropres return).
95 static void
96 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
98 u8 *buf = (u8 *) packet;
99 int j;
101 dprintk("RPC: %s\n", msg);
102 for (j = 0; j < count && j < 128; j += 4) {
103 if (!(j & 31)) {
104 if (j)
105 dprintk("\n");
106 dprintk("0x%04x ", j);
108 dprintk("%02x%02x%02x%02x ",
109 buf[j], buf[j+1], buf[j+2], buf[j+3]);
111 dprintk("\n");
113 #else
114 static inline void
115 xprt_pktdump(char *msg, u32 *packet, unsigned int count)
117 /* NOP */
119 #endif
122 * Look up RPC transport given an INET socket
124 static inline struct rpc_xprt *
125 xprt_from_sock(struct sock *sk)
127 #ifndef SOCK_HAS_USER_DATA
128 struct rpc_xprt *xprt;
130 for (xprt = sock_list; xprt && sk != xprt->inet; xprt = xprt->link)
132 return xprt;
133 #else
134 return (struct rpc_xprt *) sk->user_data;
135 #endif
139 * Write data to socket.
141 static inline int
142 xprt_sendmsg(struct rpc_xprt *xprt)
144 struct socket *sock = xprt->sock;
145 struct msghdr msg;
146 mm_segment_t oldfs;
147 int result;
149 xprt_pktdump("packet data:",
150 xprt->snd_buf.io_vec->iov_base,
151 xprt->snd_buf.io_vec->iov_len);
153 #if LINUX_VERSION_CODE >= 0x020100
154 msg.msg_flags = MSG_DONTWAIT;
155 msg.msg_iov = xprt->snd_buf.io_vec;
156 msg.msg_iovlen = xprt->snd_buf.io_nr;
157 msg.msg_name = (struct sockaddr *) &xprt->addr;
158 msg.msg_namelen = sizeof(xprt->addr);
159 msg.msg_control = NULL;
161 oldfs = get_fs(); set_fs(get_ds());
162 result = sock_sendmsg(sock, &msg, xprt->snd_buf.io_len);
163 set_fs(oldfs);
164 #else
165 msg.msg_flags = 0;
166 msg.msg_iov = xprt->snd_buf.io_vec;
167 msg.msg_iovlen = xprt->snd_buf.io_nr;
168 msg.msg_name = (struct sockaddr *) &xprt->addr;
169 msg.msg_namelen = sizeof(xprt->addr);
170 msg.msg_control = NULL;
172 oldfs = get_fs(); set_fs(get_ds());
173 result = sock->ops->sendmsg(sock, &msg, xprt->snd_buf.io_len, 1, 0);
174 set_fs(oldfs);
175 #endif
177 dprintk("RPC: xprt_sendmsg(%d) = %d\n",
178 xprt->snd_buf.io_len, result);
180 if (result >= 0) {
181 xprt->snd_buf.io_len -= result;
182 return result;
185 switch (result) {
186 case -ECONNREFUSED:
187 /* When the server has died, an ICMP port unreachable message
188 * prompts ECONNREFUSED.
190 break;
191 case -ENOTCONN: case -EPIPE:
192 /* connection broken */
193 break;
194 default:
195 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);
196 result = 0;
198 return result;
202 * Read data from socket
204 static inline int
205 xprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, int len)
207 struct socket *sock = xprt->sock;
208 struct sockaddr_in sin;
209 struct msghdr msg;
210 mm_segment_t oldfs;
211 int result;
213 #if LINUX_VERSION_CODE >= 0x020100
214 msg.msg_flags = MSG_DONTWAIT;
215 msg.msg_iov = iov;
216 msg.msg_iovlen = nr;
217 msg.msg_name = &sin;
218 msg.msg_namelen = sizeof(sin);
219 msg.msg_control = NULL;
221 oldfs = get_fs(); set_fs(get_ds());
222 result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);
223 set_fs(oldfs);
224 #else
225 int alen = sizeof(sin);
226 msg.msg_flags = 0;
227 msg.msg_iov = iov;
228 msg.msg_iovlen = nr;
229 msg.msg_name = &sin;
230 msg.msg_namelen = sizeof(sin);
231 msg.msg_control = NULL;
233 oldfs = get_fs(); set_fs(get_ds());
234 result = sock->ops->recvmsg(sock, &msg, len, 1, 0, &alen);
235 set_fs(oldfs);
236 #endif
238 if (!result && len)
239 result = -EAGAIN;
241 dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n",
242 iov, len, result);
243 return result;
248 * Adjust RPC congestion window
249 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
251 static void
252 xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
254 unsigned long cwnd = xprt->cwnd;
256 if (xprt->nocong)
257 return;
258 if (result >= 0) {
259 if (xprt->cong < cwnd || jiffies < xprt->congtime)
260 return;
261 /* The (cwnd >> 1) term makes sure
262 * the result gets rounded properly. */
263 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
264 if (cwnd > RPC_MAXCWND)
265 cwnd = RPC_MAXCWND;
266 else
267 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
268 xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
269 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
270 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
271 (xprt->congtime-jiffies)*1000/HZ);
272 } else if (result == -ETIMEDOUT) {
273 if ((cwnd >>= 1) < RPC_CWNDSCALE)
274 cwnd = RPC_CWNDSCALE;
275 xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
276 dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
277 "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
278 (xprt->congtime-jiffies)*1000/HZ);
279 pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
282 xprt->cwnd = cwnd;
286 * Adjust timeout values etc for next retransmit
289 xprt_adjust_timeout(struct rpc_timeout *to)
291 if (to->to_exponential)
292 to->to_current <<= 1;
293 else
294 to->to_current += to->to_increment;
295 if (to->to_maxval && to->to_current >= to->to_maxval) {
296 to->to_current = to->to_maxval;
297 to->to_retries = 0;
299 if (!to->to_current) {
300 printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
301 to->to_current = 5 * HZ;
303 pprintk("RPC: %lu %s\n", jiffies,
304 to->to_retries? "retrans" : "timeout");
305 return (to->to_retries)--;
309 * Close down a transport socket
311 static void
312 xprt_close(struct rpc_xprt *xprt)
314 struct sock *sk = xprt->inet;
316 #ifdef SOCK_HAS_USER_DATA
317 sk->user_data = NULL;
318 #endif
319 sk->data_ready = xprt->old_data_ready;
320 sk->state_change = xprt->old_state_change;
321 sk->write_space = xprt->old_write_space;
323 if (xprt->file)
324 fput(xprt->file);
325 else
326 sock_release(xprt->sock);
328 * TCP doesnt require the rpciod now - other things may
329 * but rpciod handles that not us.
331 if(xprt->stream)
332 rpciod_down();
336 * Mark a transport as disconnected
338 static void
339 xprt_disconnect(struct rpc_xprt *xprt)
341 dprintk("RPC: disconnected transport %p\n", xprt);
342 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
343 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
344 xprt->connected = 0;
348 * Reconnect a broken TCP connection.
350 void
351 xprt_reconnect(struct rpc_task *task)
353 struct rpc_xprt *xprt = task->tk_xprt;
354 struct socket *sock;
355 struct sock *inet;
356 int status;
358 dprintk("RPC: %4d xprt_reconnect %p connected %d\n",
359 task->tk_pid, xprt, xprt->connected);
360 task->tk_status = 0;
362 if (xprt->connecting) {
363 task->tk_timeout = xprt->timeout.to_maxval;
364 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
365 return;
367 xprt->connecting = 1;
369 /* Create an unconnected socket */
370 if (!(sock = xprt_create_socket(xprt->prot, NULL, &xprt->timeout)))
371 goto defer;
373 #if LINUX_VERSION_CODE >= 0x020100
374 inet = sock->sk;
375 #else
376 inet = (struct sock *) sock->data;
377 #endif
378 inet->data_ready = xprt->inet->data_ready;
379 inet->state_change = xprt->inet->state_change;
380 inet->write_space = xprt->inet->write_space;
381 #ifdef SOCK_HAS_USER_DATA
382 inet->user_data = xprt;
383 #endif
385 dprintk("RPC: %4d closing old socket\n", task->tk_pid);
386 xprt_disconnect(xprt);
387 xprt_close(xprt);
389 /* Reset to new socket and default congestion */
390 xprt->sock = sock;
391 xprt->inet = inet;
392 xprt->cwnd = RPC_INITCWND;
394 /* Now connect it asynchronously. */
395 dprintk("RPC: %4d connecting new socket\n", task->tk_pid);
396 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
397 sizeof(xprt->addr), O_NONBLOCK);
398 if (status < 0) {
399 if (status != -EINPROGRESS && status != -EALREADY) {
400 printk("RPC: TCP connect error %d!\n", -status);
401 goto defer;
404 dprintk("RPC: %4d connect status %d connected %d\n",
405 task->tk_pid, status, xprt->connected);
406 task->tk_timeout = 60 * HZ;
408 start_bh_atomic();
409 if (!xprt->connected) {
410 rpc_sleep_on(&xprt->reconn, task,
411 xprt_reconn_status, xprt_reconn_timeout);
412 end_bh_atomic();
413 return;
415 end_bh_atomic();
418 xprt->connecting = 0;
419 rpc_wake_up(&xprt->reconn);
420 return;
422 defer:
423 task->tk_timeout = 30 * HZ;
424 rpc_sleep_on(&xprt->reconn, task, NULL, NULL);
425 xprt->connecting = 0;
429 * Reconnect status
431 static void
432 xprt_reconn_status(struct rpc_task *task)
434 struct rpc_xprt *xprt = task->tk_xprt;
436 dprintk("RPC: %4d xprt_reconn_status %d\n",
437 task->tk_pid, task->tk_status);
438 if (!xprt->connected && task->tk_status != -ETIMEDOUT) {
439 task->tk_timeout = 30 * HZ;
440 rpc_sleep_on(&xprt->reconn, task, NULL, xprt_reconn_timeout);
445 * Reconnect timeout. We just mark the transport as not being in the
446 * process of reconnecting, and leave the rest to the upper layers.
448 static void
449 xprt_reconn_timeout(struct rpc_task *task)
451 dprintk("RPC: %4d xprt_reconn_timeout %d\n",
452 task->tk_pid, task->tk_status);
453 task->tk_status = -ENOTCONN;
454 task->tk_xprt->connecting = 0;
455 task->tk_timeout = 0;
456 rpc_wake_up_task(task);
460 * Look up the RPC request corresponding to a reply.
462 static inline struct rpc_rqst *
463 xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
465 struct rpc_task *head, *task;
466 struct rpc_rqst *req;
467 int safe = 0;
469 if ((head = xprt->pending.task) != NULL) {
470 task = head;
471 do {
472 if ((req = task->tk_rqstp) && req->rq_xid == xid)
473 return req;
474 task = task->tk_next;
475 if (++safe > 100) {
476 printk("xprt_lookup_rqst: loop in Q!\n");
477 return NULL;
479 } while (task != head);
481 dprintk("RPC: unknown XID %08x in reply.\n", xid);
482 return NULL;
486 * Complete reply received.
487 * The TCP code relies on us to remove the request from xprt->pending.
489 static inline void
490 xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
492 struct rpc_task *task = req->rq_task;
494 req->rq_rlen = copied;
495 req->rq_gotit = 1;
497 /* Adjust congestion window */
498 xprt_adjust_cwnd(xprt, copied);
500 #ifdef RPC_PROFILE
501 /* Profile only reads for now */
502 if (copied > 1024) {
503 static unsigned long nextstat = 0;
504 static unsigned long pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;
506 pkt_cnt++;
507 pkt_len += req->rq_slen + copied;
508 pkt_rtt += jiffies - req->rq_xtime;
509 if (nextstat < jiffies) {
510 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);
511 printk("RPC: %ld %ld %ld %ld stat\n",
512 jiffies, pkt_cnt, pkt_len, pkt_rtt);
513 pkt_rtt = pkt_len = pkt_cnt = 0;
514 nextstat = jiffies + 5 * HZ;
517 #endif
519 /* ... and wake up the process. */
520 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
521 task->tk_status = copied;
523 rpc_wake_up_task(task);
524 return;
528 * Input handler for RPC replies. Called from a bottom half and hence
529 * atomic.
531 static inline void
532 udp_data_ready(struct sock *sk, int len)
534 struct rpc_task *task;
535 struct rpc_xprt *xprt;
536 struct rpc_rqst *rovr;
537 struct sk_buff *skb;
538 struct iovec iov[MAX_IOVEC];
539 mm_segment_t oldfs;
540 int err, repsize, copied;
542 dprintk("RPC: udp_data_ready...\n");
543 if (!(xprt = xprt_from_sock(sk)))
544 return;
545 dprintk("RPC: udp_data_ready client %p\n", xprt);
547 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
548 return;
549 repsize = skb->len - 8; /* don't account for UDP header */
551 if (repsize < 4) {
552 printk("RPC: impossible RPC reply size %d!\n", repsize);
553 goto dropit;
556 /* Look up the request corresponding to the given XID */
557 if (!(rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + 8))))
558 goto dropit;
559 task = rovr->rq_task;
561 dprintk("RPC: %4d received reply\n", task->tk_pid);
562 xprt_pktdump("packet data:", (u32 *) (skb->h.raw+8), repsize);
564 if ((copied = rovr->rq_rlen) > repsize)
565 copied = repsize;
567 /* Okay, we have it. Copy datagram... */
568 memcpy(iov, rovr->rq_rvec, rovr->rq_rnr * sizeof(iov[0]));
569 oldfs = get_fs(); set_fs(get_ds());
570 skb_copy_datagram_iovec(skb, 8, iov, copied);
571 set_fs(oldfs);
573 xprt_complete_rqst(xprt, rovr, copied);
575 dropit:
576 skb_free_datagram(sk, skb);
577 return;
581 * TCP record receive routine
582 * This is not the most efficient code since we call recvfrom twice--
583 * first receiving the record marker and XID, then the data.
585 * The optimal solution would be a RPC support in the TCP layer, which
586 * would gather all data up to the next record marker and then pass us
587 * the list of all TCP segments ready to be copied.
589 static inline int
590 tcp_input_record(struct rpc_xprt *xprt)
592 struct rpc_rqst *req;
593 struct iovec *iov;
594 struct iovec riov;
595 u32 offset;
596 int result, maxcpy, reclen, avail, want;
598 dprintk("RPC: tcp_input_record\n");
599 offset = xprt->tcp_offset;
600 result = -EAGAIN;
601 if (offset < 4 || (!xprt->tcp_more && offset < 8)) {
602 want = (xprt->tcp_more? 4 : 8) - offset;
603 dprintk("RPC: reading header (%d bytes)\n", want);
604 riov.iov_base = xprt->tcp_recm.data + offset;
605 riov.iov_len = want;
606 result = xprt_recvmsg(xprt, &riov, 1, want);
607 if (result < 0)
608 goto done;
609 offset += result;
610 if (result < want) {
611 result = -EAGAIN;
612 goto done;
615 /* Get the record length and mask out the more_fragments bit */
616 reclen = ntohl(xprt->tcp_reclen);
617 dprintk("RPC: reclen %08x\n", reclen);
618 xprt->tcp_more = (reclen & 0x80000000)? 0 : 1;
619 if (!(reclen &= 0x7fffffff)) {
620 printk(KERN_NOTICE "RPC: empty TCP record.\n");
621 return -ENOTCONN; /* break connection */
623 xprt->tcp_total += reclen;
624 xprt->tcp_reclen = reclen;
626 dprintk("RPC: got xid %08x reclen %d morefrags %d\n",
627 xprt->tcp_xid, xprt->tcp_reclen, xprt->tcp_more);
628 if (!xprt->tcp_copied
629 && (req = xprt_lookup_rqst(xprt, xprt->tcp_xid))) {
630 iov = xprt->tcp_iovec;
631 memcpy(iov, req->rq_rvec, req->rq_rnr * sizeof(iov[0]));
632 #if 0
633 *(u32 *)iov->iov_base = req->rq_xid;
634 #endif
635 iov->iov_base += 4;
636 iov->iov_len -= 4;
637 xprt->tcp_copied = 4;
638 xprt->tcp_rqstp = req;
640 } else {
641 reclen = xprt->tcp_reclen;
644 avail = reclen - (offset - 4);
645 if ((req = xprt->tcp_rqstp) && req->rq_xid == xprt->tcp_xid
646 && req->rq_task->tk_rpcwait == &xprt->pending) {
647 want = MIN(req->rq_rlen - xprt->tcp_copied, avail);
649 dprintk("RPC: %4d TCP receiving %d bytes\n",
650 req->rq_task->tk_pid, want);
651 result = xprt_recvmsg(xprt, xprt->tcp_iovec, req->rq_rnr, want);
652 if (result < 0)
653 goto done;
654 xprt->tcp_copied += result;
655 offset += result;
656 avail -= result;
657 if (result < want) {
658 result = -EAGAIN;
659 goto done;
662 maxcpy = MIN(req->rq_rlen, xprt->tcp_total);
663 if (xprt->tcp_copied == maxcpy && !xprt->tcp_more) {
664 dprintk("RPC: %4d received reply complete\n",
665 req->rq_task->tk_pid);
666 xprt_complete_rqst(xprt, req, xprt->tcp_total);
667 xprt->tcp_copied = 0;
668 xprt->tcp_rqstp = NULL;
670 /* Request must be re-encoded before retransmit */
671 req->rq_damaged = 1;
674 /* Skip over any trailing bytes on short reads */
675 while (avail) {
676 static u8 dummy[64];
678 want = MIN(avail, sizeof(dummy));
679 riov.iov_base = dummy;
680 riov.iov_len = want;
681 dprintk("RPC: TCP skipping %d bytes\n", want);
682 result = xprt_recvmsg(xprt, &riov, 1, want);
683 if (result < 0)
684 goto done;
685 offset += result;
686 avail -= result;
687 if (result < want) {
688 result = -EAGAIN;
689 goto done;
692 if (!xprt->tcp_more)
693 xprt->tcp_total = 0;
694 offset = 0;
696 done:
697 dprintk("RPC: tcp_input_record done (off %d total %d copied %d)\n",
698 offset, xprt->tcp_total, xprt->tcp_copied);
699 xprt->tcp_offset = offset;
700 return result;
704 * TCP task queue stuff
707 static struct rpc_xprt *rpc_xprt_pending = NULL; /* Chain by rx_pending of rpc_xprt's */
710 * This is protected from tcp_data_ready and the stack as its run
711 * inside of the RPC I/O daemon
714 void rpciod_tcp_dispatcher(void)
716 struct rpc_xprt *xprt;
717 int result;
719 dprintk("rpciod_tcp_dispatcher: Queue Running\n");
722 * Empty each pending socket
725 while((xprt=rpc_xprt_pending)!=NULL)
727 int safe_retry=0;
729 rpc_xprt_pending=xprt->rx_pending;
730 xprt->rx_pending_flag=0;
732 dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);
736 if (safe_retry++ > 50)
737 break;
738 result = tcp_input_record(xprt);
740 while (result >= 0);
742 switch (result) {
743 case -EAGAIN:
744 continue;
745 case -ENOTCONN:
746 case -EPIPE:
747 xprt_disconnect(xprt);
748 continue;
749 default:
750 printk(KERN_WARNING "RPC: unexpected error %d from tcp_input_record\n",
751 result);
757 extern inline void tcp_rpciod_queue(void)
759 rpciod_wake_up();
763 * data_ready callback for TCP. We can't just jump into the
764 * tcp recvmsg functions inside of the network receive bh or
765 * bad things occur. We queue it to pick up after networking
766 * is done.
769 static void tcp_data_ready(struct sock *sk, int len)
771 struct rpc_xprt *xprt;
773 dprintk("RPC: tcp_data_ready...\n");
774 if (!(xprt = xprt_from_sock(sk)))
776 printk("Not a socket with xprt %p\n", sk);
777 return;
779 dprintk("RPC: tcp_data_ready client %p\n", xprt);
780 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
781 sk->state, xprt->connected,
782 sk->dead, sk->zapped);
784 * If we are not waiting for the RPC bh run then
785 * we are now
787 if (!xprt->rx_pending_flag)
789 dprintk("RPC: xprt queue\n");
790 if(rpc_xprt_pending==NULL)
791 tcp_rpciod_queue();
792 xprt->rx_pending_flag=1;
793 xprt->rx_pending=rpc_xprt_pending;
794 rpc_xprt_pending=xprt;
796 else
797 dprintk("RPC: xprt queued already %p\n", xprt);
801 static void
802 tcp_state_change(struct sock *sk)
804 struct rpc_xprt *xprt;
806 if (!(xprt = xprt_from_sock(sk)))
807 return;
808 dprintk("RPC: tcp_state_change client %p...\n", xprt);
809 dprintk("RPC: state %x conn %d dead %d zapped %d\n",
810 sk->state, xprt->connected,
811 sk->dead, sk->zapped);
813 if (sk->state == TCP_ESTABLISHED && !xprt->connected) {
814 xprt->connected = 1;
815 xprt->connecting = 0;
816 rpc_wake_up(&xprt->reconn);
817 } else if (sk->zapped) {
818 rpc_wake_up_status(&xprt->pending, -ENOTCONN);
819 rpc_wake_up_status(&xprt->sending, -ENOTCONN);
820 rpc_wake_up_status(&xprt->reconn, -ENOTCONN);
824 static void
825 tcp_write_space(struct sock *sk)
827 struct rpc_xprt *xprt;
829 if (!(xprt = xprt_from_sock(sk)))
830 return;
831 xprt->write_space = 1;
832 if (xprt->snd_task && !RPC_IS_RUNNING(xprt->snd_task))
833 rpc_wake_up_task(xprt->snd_task);
837 * RPC receive timeout handler.
839 static void
840 xprt_timer(struct rpc_task *task)
842 if (task->tk_rqstp)
843 xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
845 dprintk("RPC: %4d xprt_timer (%s request)\n", task->tk_pid,
846 task->tk_rqstp? "pending" : "backlogged");
848 task->tk_status = -ETIMEDOUT;
849 task->tk_timeout = 0;
850 rpc_wake_up_task(task);
854 * (Partly) transmit the RPC packet
855 * Note that task->tk_status is either 0 or negative on return.
856 * Only when the reply is received will the status be set to a
857 * positive value.
859 static inline int
860 xprt_transmit_some(struct rpc_xprt *xprt, struct rpc_task *task)
862 struct rpc_rqst *req = task->tk_rqstp;
863 int result;
865 task->tk_status = 0;
866 if ((result = xprt_sendmsg(xprt)) >= 0) {
867 if (!xprt->snd_buf.io_len || !xprt->stream) {
868 rpc_wake_up_next(&xprt->sending);
869 return req->rq_slen;
871 result = -EAGAIN;
872 } else if (xprt->stream) {
873 if (result == -ENOTCONN || result == -EPIPE) {
874 xprt_disconnect(xprt);
875 result = -ENOTCONN;
878 return task->tk_status = result;
882 * Place the actual RPC call.
883 * We have to copy the iovec because sendmsg fiddles with its contents.
885 void
886 xprt_transmit(struct rpc_task *task)
888 struct rpc_timeout *timeo;
889 struct rpc_rqst *req = task->tk_rqstp;
890 struct rpc_xprt *xprt = req->rq_xprt;
892 dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid,
893 *(u32 *)(req->rq_svec[0].iov_base));
895 if (xprt->shutdown) {
896 task->tk_status = -EIO;
897 return;
900 /* If we're not already in the process of transmitting our call,
901 * set up everything as needed. */
902 if (xprt->snd_task != task) {
903 /* Write the record marker */
904 if (xprt->stream) {
905 u32 marker;
907 if (!xprt->connected) {
908 task->tk_status = -ENOTCONN;
909 return;
911 marker = htonl(0x80000000|(req->rq_slen-4));
912 *((u32 *) req->rq_svec[0].iov_base) = marker;
915 /* Reset timeout parameters */
916 timeo = &req->rq_timeout;
917 if (timeo->to_retries < 0) {
918 dprintk("RPC: %4d xprt_transmit reset timeo\n",
919 task->tk_pid);
920 timeo->to_retries = xprt->timeout.to_retries;
921 timeo->to_current = timeo->to_initval;
924 #ifdef RPC_PROFILE
925 req->rq_xtime = jiffies;
926 #endif
927 req->rq_gotit = 0;
929 if (xprt->snd_task) {
930 dprintk("RPC: %4d TCP write queue full (task %d)\n",
931 task->tk_pid, xprt->snd_task->tk_pid);
932 rpc_sleep_on(&xprt->sending, task,
933 xprt_transmit_status, NULL);
934 return;
936 xprt->snd_buf = req->rq_snd_buf;
937 xprt->snd_task = task;
940 /* For fast networks/servers we have to put the request on
941 * the pending list now:
943 start_bh_atomic();
944 rpc_add_wait_queue(&xprt->pending, task);
945 task->tk_callback = NULL;
946 end_bh_atomic();
948 /* Continue transmitting the packet/record. We must be careful
949 * to cope with writespace callbacks arriving _after_ we have
950 * called xprt_sendmsg().
952 while (1) {
953 xprt->write_space = 0;
954 if (xprt_transmit_some(xprt, task) != -EAGAIN) {
955 dprintk("RPC: %4d xmit complete\n", task->tk_pid);
956 xprt->snd_task = NULL;
957 return;
960 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",
961 task->tk_pid, xprt->snd_buf.io_len,
962 req->rq_slen);
963 task->tk_status = 0;
964 start_bh_atomic();
965 if (!xprt->write_space) {
966 /* Remove from pending */
967 rpc_remove_wait_queue(task);
968 rpc_sleep_on(&xprt->sending, task,
969 xprt_transmit_status, NULL);
970 end_bh_atomic();
971 return;
973 end_bh_atomic();
978 * This callback is invoked when the sending task is forced to sleep
979 * because the TCP write buffers are full
981 static void
982 xprt_transmit_status(struct rpc_task *task)
984 struct rpc_xprt *xprt = task->tk_client->cl_xprt;
986 dprintk("RPC: %4d transmit_status %d\n", task->tk_pid, task->tk_status);
987 if (xprt->snd_task == task) {
988 if (task->tk_status < 0)
989 xprt->snd_task = NULL;
990 xprt_disconnect(xprt);
995 * Wait for the reply to our call.
996 * When the callback is invoked, the congestion window should have
997 * been updated already.
999 void
1000 xprt_receive(struct rpc_task *task)
1002 struct rpc_rqst *req = task->tk_rqstp;
1003 struct rpc_xprt *xprt = req->rq_xprt;
1005 dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
1006 if (xprt->connected == 0) {
1007 task->tk_status = -ENOTCONN;
1008 return;
1012 * Wait until rq_gotit goes non-null, or timeout elapsed.
1014 task->tk_timeout = req->rq_timeout.to_current;
1016 start_bh_atomic();
1017 if (!req->rq_gotit) {
1018 rpc_sleep_on(&xprt->pending, task,
1019 xprt_receive_status, xprt_timer);
1021 end_bh_atomic();
1023 dprintk("RPC: %4d xprt_receive returns %d\n",
1024 task->tk_pid, task->tk_status);
1027 static void
1028 xprt_receive_status(struct rpc_task *task)
1030 struct rpc_xprt *xprt = task->tk_xprt;
1032 if (xprt->stream && xprt->tcp_rqstp == task->tk_rqstp)
1033 xprt->tcp_rqstp = NULL;
1037 * Reserve an RPC call slot.
1040 xprt_reserve(struct rpc_task *task)
1042 struct rpc_xprt *xprt = task->tk_xprt;
1044 /* We already have an initialized request. */
1045 if (task->tk_rqstp)
1046 return 0;
1048 dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
1049 task->tk_pid, xprt->cong, xprt->cwnd);
1050 if ((!RPCXPRT_CONGESTED(xprt) && xprt->free)) {
1051 xprt_reserve_status(task);
1052 task->tk_timeout = 0;
1053 } else if (!task->tk_timeout) {
1054 task->tk_status = -ENOBUFS;
1055 } else {
1056 dprintk("RPC: xprt_reserve waiting on backlog\n");
1057 rpc_sleep_on(&xprt->backlog, task, xprt_reserve_status, NULL);
1059 dprintk("RPC: %4d xprt_reserve returns %d\n",
1060 task->tk_pid, task->tk_status);
1061 return task->tk_status;
1065 * Reservation callback
1067 static void
1068 xprt_reserve_status(struct rpc_task *task)
1070 struct rpc_xprt *xprt = task->tk_xprt;
1071 struct rpc_rqst *req;
1073 if (xprt->shutdown) {
1074 task->tk_status = -EIO;
1075 } else if (task->tk_status < 0) {
1076 /* NOP */
1077 } else if (task->tk_rqstp) {
1078 /* We've already been given a request slot: NOP */
1079 } else if (!RPCXPRT_CONGESTED(xprt)) {
1080 /* OK: There's room for us. Grab a free slot and bump
1081 * congestion value */
1082 req = xprt->free;
1083 if (!req)
1084 goto bad_list;
1085 if (req->rq_xid)
1086 goto bad_used;
1087 xprt->free = req->rq_next;
1088 xprt->cong += RPC_CWNDSCALE;
1089 task->tk_rqstp = req;
1090 req->rq_next = NULL;
1091 xprt_request_init(task, xprt);
1092 } else {
1093 task->tk_status = -EAGAIN;
1096 if (xprt->free && !RPCXPRT_CONGESTED(xprt))
1097 rpc_wake_up_next(&xprt->backlog);
1099 return;
1101 bad_list:
1102 printk("RPC: %4d inconsistent free list (cong %ld cwnd %ld)\n",
1103 task->tk_pid, xprt->cong, xprt->cwnd);
1104 rpc_debug = ~0;
1105 goto bummer;
1106 bad_used:
1107 printk("RPC: used rqst slot %p on free list!\n", req);
1108 bummer:
1109 task->tk_status = -EIO;
1110 xprt->free = NULL;
1111 return;
1115 * Initialize RPC request
1117 static void
1118 xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
1120 struct rpc_rqst *req = task->tk_rqstp;
1121 static u32 xid = 0;
1123 if (!xid)
1124 xid = jiffies;
1126 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);
1127 task->tk_status = 0;
1128 req->rq_gotit = 0;
1129 req->rq_timeout = xprt->timeout;
1130 req->rq_task = task;
1131 req->rq_xprt = xprt;
1132 req->rq_xid = xid++;
1136 * Release an RPC call slot
1138 void
1139 xprt_release(struct rpc_task *task)
1141 struct rpc_xprt *xprt = task->tk_xprt;
1142 struct rpc_rqst *req;
1144 if (!(req = task->tk_rqstp))
1145 return;
1146 task->tk_rqstp = NULL;
1147 memset(req, 0, sizeof(*req)); /* mark unused */
1149 dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
1151 /* remove slot from queue of pending */
1152 start_bh_atomic();
1153 if (task->tk_rpcwait) {
1154 printk("RPC: task of released request still queued!\n");
1155 #ifdef RPC_DEBUG
1156 printk("RPC: (task is on %s)\n", rpc_qname(task->tk_rpcwait));
1157 #endif
1158 rpc_del_timer(task);
1159 rpc_remove_wait_queue(task);
1161 end_bh_atomic();
1163 /* Decrease congestion value. If congestion threshold is not yet
1164 * reached, pass on the request slot.
1165 * This looks kind of kludgy, but it guarantees backlogged requests
1166 * are served in order.
1168 xprt->cong -= RPC_CWNDSCALE;
1169 if (!RPCXPRT_CONGESTED(xprt)) {
1170 struct rpc_task *next = rpc_wake_up_next(&xprt->backlog);
1172 if (next && next->tk_rqstp == 0) {
1173 xprt->cong += RPC_CWNDSCALE;
1174 next->tk_rqstp = req;
1175 xprt_request_init(next, xprt);
1176 return;
1180 req->rq_next = xprt->free;
1181 xprt->free = req;
1185 * Set default timeout parameters
1187 void
1188 xprt_default_timeout(struct rpc_timeout *to, int proto)
1190 if (proto == IPPROTO_UDP)
1191 xprt_set_timeout(to, 5, 5 * HZ);
1192 else
1193 xprt_set_timeout(to, 5, 15 * HZ);
1197 * Set constant timeout
1199 void
1200 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
1202 to->to_current =
1203 to->to_initval =
1204 to->to_increment = incr;
1205 to->to_maxval = incr * retr;
1206 to->to_resrvval = incr * retr;
1207 to->to_retries = retr;
1208 to->to_exponential = 0;
1212 * Initialize an RPC client
1214 static struct rpc_xprt *
1215 xprt_setup(struct socket *sock, int proto,
1216 struct sockaddr_in *ap, struct rpc_timeout *to)
1218 struct rpc_xprt *xprt;
1219 struct rpc_rqst *req;
1220 struct sock *inet;
1221 int i;
1223 dprintk("RPC: setting up %s transport...\n",
1224 proto == IPPROTO_UDP? "UDP" : "TCP");
1226 #if LINUX_VERSION_CODE >= 0x020100
1227 inet = sock->sk;
1228 #else
1229 inet = (struct sock *) sock->data;
1230 #endif
1232 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
1233 return NULL;
1234 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
1236 xprt->file = NULL;
1237 xprt->sock = sock;
1238 xprt->inet = inet;
1239 xprt->addr = *ap;
1240 xprt->prot = proto;
1241 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
1242 xprt->cwnd = RPC_INITCWND;
1243 #ifdef SOCK_HAS_USER_DATA
1244 inet->user_data = xprt;
1245 #else
1246 xprt->link = sock_list;
1247 sock_list = xprt;
1248 #endif
1249 xprt->old_data_ready = inet->data_ready;
1250 xprt->old_state_change = inet->state_change;
1251 xprt->old_write_space = inet->write_space;
1252 if (proto == IPPROTO_UDP) {
1253 inet->data_ready = udp_data_ready;
1254 } else {
1255 inet->data_ready = tcp_data_ready;
1256 inet->state_change = tcp_state_change;
1257 inet->write_space = tcp_write_space;
1258 xprt->nocong = 1;
1260 xprt->connected = 1;
1262 /* Set timeout parameters */
1263 if (to) {
1264 xprt->timeout = *to;
1265 xprt->timeout.to_current = to->to_initval;
1266 xprt->timeout.to_resrvval = to->to_maxval << 1;
1267 } else {
1268 xprt_default_timeout(&xprt->timeout, xprt->prot);
1271 xprt->pending = RPC_INIT_WAITQ("xprt_pending");
1272 xprt->sending = RPC_INIT_WAITQ("xprt_sending");
1273 xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
1274 xprt->reconn = RPC_INIT_WAITQ("xprt_reconn");
1276 /* initialize free list */
1277 for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
1278 req->rq_next = req + 1;
1279 req->rq_next = NULL;
1280 xprt->free = xprt->slot;
1282 dprintk("RPC: created transport %p\n", xprt);
1285 * TCP requires the rpc I/O daemon is present
1287 if(proto==IPPROTO_TCP)
1288 rpciod_up();
1289 return xprt;
1293 * Create and initialize an RPC client given an open file.
1294 * This is obsolete now.
1296 #if 0
1297 struct rpc_xprt *
1298 xprt_create(struct file *file, struct sockaddr_in *ap, struct rpc_timeout *to)
1300 struct rpc_xprt *xprt;
1301 struct socket *sock;
1302 int proto;
1304 if (!file) {
1305 printk("RPC: file == NULL in xprt_create!\n");
1306 return NULL;
1309 sock = &file->f_inode->u.socket_i;
1310 if (sock->ops->family != PF_INET) {
1311 printk(KERN_WARNING "RPC: only INET sockets supported\n");
1312 return NULL;
1315 proto = (sock->type == SOCK_DGRAM)? IPPROTO_UDP : IPPROTO_TCP;
1316 if ((xprt = xprt_setup(sock, proto, ap, to)) != NULL) {
1317 xprt->file = file;
1318 file->f_count++;
1321 return xprt;
1323 #endif
1326 * Bind to a reserved port
1328 static inline int
1329 xprt_bindresvport(struct socket *sock)
1331 struct sockaddr_in myaddr;
1332 int err, port;
1334 memset(&myaddr, 0, sizeof(myaddr));
1335 myaddr.sin_family = AF_INET;
1336 port = 800;
1337 do {
1338 myaddr.sin_port = htons(port);
1339 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,
1340 sizeof(myaddr));
1341 } while (err == -EADDRINUSE && --port > 0);
1343 if (err < 0)
1344 printk("RPC: Can't bind to reserved port (%d).\n", -err);
1346 return err;
1350 * Create a client socket given the protocol and peer address.
1352 static struct socket *
1353 xprt_create_socket(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1355 struct socket *sock;
1356 int type, err;
1358 dprintk("RPC: xprt_create_socket(%08lx, %s %d)\n",
1359 sap? ntohl(sap->sin_addr.s_addr) : 0,
1360 (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
1362 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
1363 if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {
1364 printk("RPC: can't create socket (%d).\n", -err);
1365 goto failed;
1368 /* If the caller has root privs, bind to a reserved port */
1369 if (!current->fsuid && xprt_bindresvport(sock) < 0)
1370 goto failed;
1372 if (type == SOCK_STREAM && sap) {
1373 err = sock->ops->connect(sock, (struct sockaddr *) sap,
1374 sizeof(*sap), 0);
1375 if (err < 0) {
1376 printk("RPC: TCP connect failed (%d).\n", -err);
1377 goto failed;
1381 return sock;
1383 failed:
1384 sock_release(sock);
1385 return NULL;
1389 * Create an RPC client transport given the protocol and peer address.
1391 struct rpc_xprt *
1392 xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
1394 struct socket *sock;
1395 struct rpc_xprt *xprt;
1397 dprintk("RPC: xprt_create_proto called\n");
1399 if (!(sock = xprt_create_socket(proto, sap, to)))
1400 return NULL;
1402 if (!(xprt = xprt_setup(sock, proto, sap, to)))
1403 sock_release(sock);
1405 return xprt;
1409 * Prepare for transport shutdown.
1411 void
1412 xprt_shutdown(struct rpc_xprt *xprt)
1414 xprt->shutdown = 1;
1415 rpc_wake_up(&xprt->sending);
1416 rpc_wake_up(&xprt->pending);
1417 rpc_wake_up(&xprt->backlog);
1418 rpc_wake_up(&xprt->reconn);
1422 * Destroy an RPC transport, killing off all requests.
1425 xprt_destroy(struct rpc_xprt *xprt)
1427 #ifndef SOCK_HAS_USER_DATA
1428 struct rpc_xprt **q;
1430 for (q = &sock_list; *q && *q != xprt; q = &((*q)->link))
1432 if (!*q) {
1433 printk(KERN_WARNING "xprt_destroy: unknown socket!\n");
1434 return -EIO; /* why is there no EBUGGYSOFTWARE */
1436 *q = xprt->link;
1437 #endif
1439 dprintk("RPC: destroying transport %p\n", xprt);
1440 xprt_close(xprt);
1441 kfree(xprt);
1443 return 0;