2 * linux/net/sunrpc/sched.c
4 * Scheduling for synchronous and asynchronous RPC requests.
6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
10 #include <linux/module.h>
12 #define __KERNEL_SYSCALLS__
13 #include <linux/sched.h>
14 #include <linux/interrupt.h>
15 #include <linux/malloc.h>
16 #include <linux/unistd.h>
17 #include <linux/smp.h>
18 #include <linux/smp_lock.h>
20 #include <linux/sunrpc/clnt.h>
23 #define RPCDBG_FACILITY RPCDBG_SCHED
24 static int rpc_task_id
= 0;
28 * We give RPC the same get_free_pages priority as NFS
30 #define GFP_RPC GFP_NFS
32 static void __rpc_default_timer(struct rpc_task
*task
);
33 static void rpciod_killall(void);
36 * When an asynchronous RPC task is activated within a bottom half
37 * handler, or while executing another RPC task, it is put on
38 * schedq, and rpciod is woken up.
40 static struct rpc_wait_queue schedq
= RPC_INIT_WAITQ("schedq");
43 * RPC tasks that create another task (e.g. for contacting the portmapper)
44 * will wait on this queue for their child's completion
46 static struct rpc_wait_queue childq
= RPC_INIT_WAITQ("childq");
49 * RPC tasks sit here while waiting for conditions to improve.
51 static struct rpc_wait_queue delay_queue
= RPC_INIT_WAITQ("delayq");
54 * All RPC tasks are linked into this list
56 static struct rpc_task
* all_tasks
= NULL
;
59 * rpciod-related stuff
61 static struct wait_queue
* rpciod_idle
= NULL
;
62 static struct wait_queue
* rpciod_killer
= NULL
;
63 static struct semaphore rpciod_sema
= MUTEX
;
64 static unsigned int rpciod_users
= 0;
65 static pid_t rpciod_pid
= 0;
66 static int rpc_inhibit
= 0;
69 * This is the last-ditch buffer for NFS swap requests
71 static u32 swap_buffer
[PAGE_SIZE
>> 2];
72 static int swap_buffer_used
= 0;
75 * Add new request to wait queue.
77 * Swapper tasks always get inserted at the head of the queue.
78 * This should avoid many nasty memory deadlocks and hopefully
79 * improve overall performance.
80 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
83 rpc_add_wait_queue(struct rpc_wait_queue
*queue
, struct rpc_task
*task
)
85 if (task
->tk_rpcwait
) {
86 if (task
->tk_rpcwait
!= queue
)
87 printk(KERN_WARNING
"RPC: doubly enqueued task!\n");
90 if (RPC_IS_SWAPPER(task
))
91 rpc_insert_list(&queue
->task
, task
);
93 rpc_append_list(&queue
->task
, task
);
94 task
->tk_rpcwait
= queue
;
96 dprintk("RPC: %4d added to queue %p \"%s\"\n",
97 task
->tk_pid
, queue
, rpc_qname(queue
));
101 * Remove request from queue.
102 * Note: must be called with interrupts disabled.
105 rpc_remove_wait_queue(struct rpc_task
*task
)
107 struct rpc_wait_queue
*queue
;
109 if (!(queue
= task
->tk_rpcwait
))
111 rpc_remove_list(&queue
->task
, task
);
112 task
->tk_rpcwait
= NULL
;
114 dprintk("RPC: %4d removed from queue %p \"%s\"\n",
115 task
->tk_pid
, queue
, rpc_qname(queue
));
119 * Set up a timer for the current task.
122 rpc_add_timer(struct rpc_task
*task
, rpc_action timer
)
124 unsigned long expires
= jiffies
+ task
->tk_timeout
;
126 dprintk("RPC: %4d setting alarm for %lu ms\n",
127 task
->tk_pid
, task
->tk_timeout
* 1000 / HZ
);
129 timer
= __rpc_default_timer
;
130 if (time_before(expires
, jiffies
)) {
131 printk(KERN_ERR
"RPC: bad timeout value %ld - setting to 10 sec!\n",
133 expires
= jiffies
+ 10 * HZ
;
135 task
->tk_timer
.expires
= expires
;
136 task
->tk_timer
.data
= (unsigned long) task
;
137 task
->tk_timer
.function
= (void (*)(unsigned long)) timer
;
138 task
->tk_timer
.prev
= NULL
;
139 task
->tk_timer
.next
= NULL
;
140 add_timer(&task
->tk_timer
);
144 * Delete any timer for the current task.
145 * Must be called with interrupts off.
148 rpc_del_timer(struct rpc_task
*task
)
150 if (task
->tk_timeout
) {
151 dprintk("RPC: %4d deleting timer\n", task
->tk_pid
);
152 del_timer(&task
->tk_timer
);
153 task
->tk_timeout
= 0;
158 * Make an RPC task runnable.
160 * Note: If the task is ASYNC, this must be called with
161 * interrupts disabled to protect the wait queue operation.
164 rpc_make_runnable(struct rpc_task
*task
)
166 if (task
->tk_timeout
) {
167 printk(KERN_ERR
"RPC: task w/ running timer in rpc_make_runnable!!\n");
170 if (RPC_IS_ASYNC(task
)) {
171 rpc_add_wait_queue(&schedq
, task
);
172 wake_up(&rpciod_idle
);
174 wake_up(&task
->tk_wait
);
176 task
->tk_flags
|= RPC_TASK_RUNNING
;
181 * For other people who may need to wake the I/O daemon
182 * but should (for now) know nothing about its innards
185 void rpciod_wake_up(void)
189 printk(KERN_ERR
"rpciod: wot no daemon?\n");
191 wake_up(&rpciod_idle
);
195 * Prepare for sleeping on a wait queue.
196 * By always appending tasks to the list we ensure FIFO behavior.
197 * NB: An RPC task will only receive interrupt-driven events as long
198 * as it's on a wait queue.
201 __rpc_sleep_on(struct rpc_wait_queue
*q
, struct rpc_task
*task
,
202 rpc_action action
, rpc_action timer
)
204 unsigned long oldflags
;
206 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task
->tk_pid
,
207 rpc_qname(q
), jiffies
);
210 * Protect the execution below.
212 save_flags(oldflags
); cli();
214 rpc_add_wait_queue(q
, task
);
215 task
->tk_callback
= action
;
216 if (task
->tk_timeout
)
217 rpc_add_timer(task
, timer
);
218 task
->tk_flags
&= ~RPC_TASK_RUNNING
;
220 restore_flags(oldflags
);
225 rpc_sleep_on(struct rpc_wait_queue
*q
, struct rpc_task
*task
,
226 rpc_action action
, rpc_action timer
)
228 __rpc_sleep_on(q
, task
, action
, timer
);
232 * Wake up a single task -- must be invoked with bottom halves off.
234 * It would probably suffice to cli/sti the del_timer and remove_wait_queue
235 * operations individually.
238 __rpc_wake_up(struct rpc_task
*task
)
240 dprintk("RPC: %4d __rpc_wake_up (now %ld inh %d)\n",
241 task
->tk_pid
, jiffies
, rpc_inhibit
);
244 if (task
->tk_magic
!= 0xf00baa) {
245 printk(KERN_ERR
"RPC: attempt to wake up non-existing task!\n");
251 if (task
->tk_rpcwait
!= &schedq
)
252 rpc_remove_wait_queue(task
);
253 if (!RPC_IS_RUNNING(task
)) {
254 rpc_make_runnable(task
);
255 task
->tk_flags
|= RPC_TASK_CALLBACK
;
257 dprintk("RPC: __rpc_wake_up done\n");
261 * Default timeout handler if none specified by user
264 __rpc_default_timer(struct rpc_task
*task
)
266 dprintk("RPC: %d timeout (default timer)\n", task
->tk_pid
);
267 task
->tk_status
= -ETIMEDOUT
;
268 task
->tk_timeout
= 0;
273 * Wake up the specified task
276 rpc_wake_up_task(struct rpc_task
*task
)
278 unsigned long oldflags
;
280 save_flags(oldflags
); cli();
282 restore_flags(oldflags
);
286 * Wake up the next task on the wait queue.
289 rpc_wake_up_next(struct rpc_wait_queue
*queue
)
291 unsigned long oldflags
;
292 struct rpc_task
*task
;
294 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue
, rpc_qname(queue
));
295 save_flags(oldflags
); cli();
296 if ((task
= queue
->task
) != 0)
298 restore_flags(oldflags
);
304 * Wake up all tasks on a queue
307 rpc_wake_up(struct rpc_wait_queue
*queue
)
309 unsigned long oldflags
;
311 save_flags(oldflags
); cli();
313 __rpc_wake_up(queue
->task
);
314 restore_flags(oldflags
);
318 * Wake up all tasks on a queue, and set their status value.
321 rpc_wake_up_status(struct rpc_wait_queue
*queue
, int status
)
323 struct rpc_task
*task
;
324 unsigned long oldflags
;
326 save_flags(oldflags
); cli();
327 while ((task
= queue
->task
) != NULL
) {
328 task
->tk_status
= status
;
331 restore_flags(oldflags
);
335 * Run a task at a later time
337 static void __rpc_atrun(struct rpc_task
*);
339 rpc_delay(struct rpc_task
*task
, unsigned long delay
)
341 task
->tk_timeout
= delay
;
342 rpc_sleep_on(&delay_queue
, task
, NULL
, __rpc_atrun
);
346 __rpc_atrun(struct rpc_task
*task
)
353 * This is the RPC `scheduler' (or rather, the finite state machine).
356 __rpc_execute(struct rpc_task
*task
)
358 unsigned long oldflags
;
361 dprintk("RPC: %4d rpc_execute flgs %x\n",
362 task
->tk_pid
, task
->tk_flags
);
364 if (!RPC_IS_RUNNING(task
)) {
365 printk(KERN_WARNING
"RPC: rpc_execute called for sleeping task!!\n");
371 * Execute any pending callback.
373 if (task
->tk_flags
& RPC_TASK_CALLBACK
) {
374 task
->tk_flags
&= ~RPC_TASK_CALLBACK
;
375 if (task
->tk_callback
) {
376 task
->tk_callback(task
);
377 task
->tk_callback
= NULL
;
382 * No handler for next step means exit.
384 if (!task
->tk_action
)
388 * Perform the next FSM step.
389 * tk_action may be NULL when the task has been killed
392 if (RPC_IS_RUNNING(task
) && task
->tk_action
)
393 task
->tk_action(task
);
396 * Check whether task is sleeping.
397 * Note that if the task may go to sleep in tk_action,
398 * and the RPC reply arrives before we get here, it will
399 * have state RUNNING, but will still be on schedq.
401 save_flags(oldflags
); cli();
402 if (RPC_IS_RUNNING(task
)) {
403 if (task
->tk_rpcwait
== &schedq
)
404 rpc_remove_wait_queue(task
);
405 } else while (!RPC_IS_RUNNING(task
)) {
406 if (RPC_IS_ASYNC(task
)) {
407 restore_flags(oldflags
);
411 /* sync task: sleep here */
412 dprintk("RPC: %4d sync task going to sleep\n",
414 if (current
->pid
== rpciod_pid
)
415 printk(KERN_ERR
"RPC: rpciod waiting on sync task!\n");
416 sleep_on(&task
->tk_wait
);
419 * When the task received a signal, remove from
420 * any queues etc, and make runnable again.
425 dprintk("RPC: %4d sync task resuming\n",
428 restore_flags(oldflags
);
431 * When a sync task receives a signal, it exits with
432 * -ERESTARTSYS. In order to catch any callbacks that
433 * clean up after sleeping on some queue, we don't
434 * break the loop here, but go around once more.
436 if (!RPC_IS_ASYNC(task
) && signalled()) {
437 dprintk("RPC: %4d got signal\n", task
->tk_pid
);
438 rpc_exit(task
, -ERESTARTSYS
);
442 dprintk("RPC: %4d exit() = %d\n", task
->tk_pid
, task
->tk_status
);
444 status
= task
->tk_status
;
452 * User-visible entry point to the scheduler.
453 * The recursion protection is for debugging. It should go away once
454 * the code has stabilized.
457 rpc_execute(struct rpc_task
*task
)
459 static int executing
= 0;
460 int incr
= RPC_IS_ASYNC(task
)? 1 : 0;
464 printk(KERN_INFO
"RPC: execution inhibited!\n");
468 printk(KERN_WARNING
"RPC: %d tasks executed\n", executing
);
477 * This is our own little scheduler for async RPC tasks.
482 struct rpc_task
*task
;
484 unsigned long oldflags
;
485 int need_resched
= current
->need_resched
;
487 dprintk("RPC: rpc_schedule enter\n");
488 save_flags(oldflags
);
491 if (!(task
= schedq
.task
))
494 rpc_remove_wait_queue(task
);
495 task
->tk_flags
|= RPC_TASK_RUNNING
;
496 restore_flags(oldflags
);
500 if (++count
>= 200) {
507 restore_flags(oldflags
);
508 dprintk("RPC: rpc_schedule leave\n");
512 * Allocate memory for RPC purpose.
514 * This is yet another tricky issue: For sync requests issued by
515 * a user process, we want to make kmalloc sleep if there isn't
516 * enough memory. Async requests should not sleep too excessively
517 * because that will block rpciod (but that's not dramatic when
518 * it's starved of memory anyway). Finally, swapout requests should
519 * never sleep at all, and should not trigger another swap_out
520 * request through kmalloc which would just increase memory contention.
522 * I hope the following gets it right, which gives async requests
523 * a slight advantage over sync requests (good for writeback, debatable
526 * sync user requests: GFP_KERNEL
527 * async requests: GFP_RPC (== GFP_NFS)
528 * swap requests: GFP_ATOMIC (or new GFP_SWAPPER)
531 rpc_allocate(unsigned int flags
, unsigned int size
)
536 if (flags
& RPC_TASK_SWAPPER
)
538 else if (flags
& RPC_TASK_ASYNC
)
544 if ((buffer
= (u32
*) kmalloc(size
, gfp
)) != NULL
) {
545 dprintk("RPC: allocated buffer %p\n", buffer
);
548 if ((flags
& RPC_TASK_SWAPPER
) && !swap_buffer_used
++) {
549 dprintk("RPC: used last-ditch swap buffer\n");
552 if (flags
& RPC_TASK_ASYNC
)
554 current
->state
= TASK_INTERRUPTIBLE
;
555 schedule_timeout(HZ
>>4);
556 } while (!signalled());
562 rpc_free(void *buffer
)
564 if (buffer
!= swap_buffer
) {
568 swap_buffer_used
= 0;
572 * Creation and deletion of RPC task structures
575 rpc_init_task(struct rpc_task
*task
, struct rpc_clnt
*clnt
,
576 rpc_action callback
, int flags
)
578 memset(task
, 0, sizeof(*task
));
579 task
->tk_client
= clnt
;
580 task
->tk_flags
= RPC_TASK_RUNNING
| flags
;
581 task
->tk_exit
= callback
;
582 if (current
->uid
!= current
->fsuid
|| current
->gid
!= current
->fsgid
)
583 task
->tk_flags
|= RPC_TASK_SETUID
;
585 /* Initialize retry counters */
586 task
->tk_garb_retry
= 2;
587 task
->tk_cred_retry
= 2;
588 task
->tk_suid_retry
= 1;
590 /* Add to global list of all tasks */
591 task
->tk_next_task
= all_tasks
;
592 task
->tk_prev_task
= NULL
;
594 all_tasks
->tk_prev_task
= task
;
601 task
->tk_magic
= 0xf00baa;
602 task
->tk_pid
= rpc_task_id
++;
604 dprintk("RPC: %4d new task procpid %d\n", task
->tk_pid
,
609 * Create a new task for the specified client. We have to
610 * clean up after an allocation failure, as the client may
611 * have specified "oneshot".
614 rpc_new_task(struct rpc_clnt
*clnt
, rpc_action callback
, int flags
)
616 struct rpc_task
*task
;
618 task
= (struct rpc_task
*) rpc_allocate(flags
, sizeof(*task
));
622 rpc_init_task(task
, clnt
, callback
, flags
);
624 dprintk("RPC: %4d allocated task\n", task
->tk_pid
);
625 task
->tk_flags
|= RPC_TASK_DYNAMIC
;
630 /* Check whether to release the client */
632 printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
633 clnt
->cl_users
, clnt
->cl_oneshot
);
634 clnt
->cl_users
++; /* pretend we were used ... */
635 rpc_release_client(clnt
);
641 rpc_release_task(struct rpc_task
*task
)
643 struct rpc_task
*next
, *prev
;
645 dprintk("RPC: %4d release task\n", task
->tk_pid
);
647 /* Remove from global task list */
648 prev
= task
->tk_prev_task
;
649 next
= task
->tk_next_task
;
651 next
->tk_prev_task
= prev
;
653 prev
->tk_next_task
= next
;
657 /* Release resources */
661 rpcauth_releasecred(task
);
662 if (task
->tk_buffer
) {
663 rpc_free(task
->tk_buffer
);
664 task
->tk_buffer
= NULL
;
666 if (task
->tk_client
) {
667 rpc_release_client(task
->tk_client
);
668 task
->tk_client
= NULL
;
675 if (task
->tk_flags
& RPC_TASK_DYNAMIC
) {
676 dprintk("RPC: %4d freeing task\n", task
->tk_pid
);
677 task
->tk_flags
&= ~RPC_TASK_DYNAMIC
;
683 * Handling of RPC child tasks
684 * We can't simply call wake_up(parent) here, because the
685 * parent task may already have gone away
687 static inline struct rpc_task
*
688 rpc_find_parent(struct rpc_task
*child
)
690 struct rpc_task
*temp
, *parent
;
692 parent
= (struct rpc_task
*) child
->tk_calldata
;
693 for (temp
= childq
.task
; temp
; temp
= temp
->tk_next
) {
701 rpc_child_exit(struct rpc_task
*child
)
703 struct rpc_task
*parent
;
705 if ((parent
= rpc_find_parent(child
)) != NULL
) {
706 parent
->tk_status
= child
->tk_status
;
707 rpc_wake_up_task(parent
);
709 rpc_release_task(child
);
713 * Note: rpc_new_task releases the client after a failure.
716 rpc_new_child(struct rpc_clnt
*clnt
, struct rpc_task
*parent
)
718 struct rpc_task
*task
;
720 task
= rpc_new_task(clnt
, NULL
, RPC_TASK_ASYNC
| RPC_TASK_CHILD
);
723 task
->tk_exit
= rpc_child_exit
;
724 task
->tk_calldata
= parent
;
728 parent
->tk_status
= -ENOMEM
;
733 rpc_run_child(struct rpc_task
*task
, struct rpc_task
*child
, rpc_action func
)
735 unsigned long oldflags
;
737 save_flags(oldflags
); cli();
738 rpc_make_runnable(child
);
739 restore_flags(oldflags
);
740 /* N.B. Is it possible for the child to have already finished? */
741 rpc_sleep_on(&childq
, task
, func
, NULL
);
745 * Kill all tasks for the given client.
746 * XXX: kill their descendants as well?
749 rpc_killall_tasks(struct rpc_clnt
*clnt
)
751 struct rpc_task
**q
, *rovr
;
753 dprintk("RPC: killing all tasks for client %p\n", clnt
);
754 /* N.B. Why bother to inhibit? Nothing blocks here ... */
756 for (q
= &all_tasks
; (rovr
= *q
); q
= &rovr
->tk_next_task
) {
757 if (!clnt
|| rovr
->tk_client
== clnt
) {
758 rovr
->tk_flags
|= RPC_TASK_KILLED
;
759 rpc_exit(rovr
, -EIO
);
760 rpc_wake_up_task(rovr
);
766 static struct semaphore rpciod_running
= MUTEX_LOCKED
;
769 * This is the rpciod kernel thread
774 struct wait_queue
**assassin
= (struct wait_queue
**) ptr
;
775 unsigned long oldflags
;
781 * Let our maker know we're running ...
783 rpciod_pid
= current
->pid
;
789 spin_lock_irq(¤t
->sigmask_lock
);
790 siginitsetinv(¤t
->blocked
, sigmask(SIGKILL
));
791 recalc_sigpending(current
);
792 spin_unlock_irq(¤t
->sigmask_lock
);
794 current
->session
= 1;
796 sprintf(current
->comm
, "rpciod");
798 dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid
);
799 while (rpciod_users
) {
802 flush_signals(current
);
806 if (++rounds
>= 64) { /* safeguard */
810 save_flags(oldflags
); cli();
812 dprintk("RPC: rpciod back to sleep\n");
813 interruptible_sleep_on(&rpciod_idle
);
814 dprintk("RPC: switch to rpciod\n");
815 rpciod_tcp_dispatcher();
818 restore_flags(oldflags
);
821 dprintk("RPC: rpciod shutdown commences\n");
823 printk(KERN_ERR
"rpciod: active tasks at shutdown?!\n");
830 dprintk("RPC: rpciod exiting\n");
841 current
->sigpending
= 0;
842 rpc_killall_tasks(NULL
);
845 dprintk("rpciod_killall: waiting for tasks to exit\n");
846 current
->state
= TASK_INTERRUPTIBLE
;
851 spin_lock_irqsave(¤t
->sigmask_lock
, flags
);
852 recalc_sigpending(current
);
853 spin_unlock_irqrestore(¤t
->sigmask_lock
, flags
);
857 * Start up the rpciod process if it's not already running.
866 dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid
, rpciod_users
);
871 * If there's no pid, we should be the first user.
873 if (rpciod_users
> 1)
874 printk(KERN_WARNING
"rpciod_up: no pid, %d users??\n", rpciod_users
);
876 * Create the rpciod thread and wait for it to start.
878 error
= kernel_thread(rpciod
, &rpciod_killer
, 0);
880 printk(KERN_WARNING
"rpciod_up: create thread failed, error=%d\n", error
);
884 down(&rpciod_running
);
899 dprintk("rpciod_down pid %d sema %d\n", rpciod_pid
, rpciod_users
);
904 printk(KERN_WARNING
"rpciod_down: pid=%d, no users??\n", rpciod_pid
);
907 dprintk("rpciod_down: Nothing to do!\n");
911 kill_proc(rpciod_pid
, SIGKILL
, 1);
913 * Usually rpciod will exit very quickly, so we
914 * wait briefly before checking the process id.
916 current
->sigpending
= 0;
917 current
->state
= TASK_INTERRUPTIBLE
;
920 * Display a message if we're going to wait longer.
923 dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid
);
925 dprintk("rpciod_down: caught signal\n");
928 interruptible_sleep_on(&rpciod_killer
);
930 spin_lock_irqsave(¤t
->sigmask_lock
, flags
);
931 recalc_sigpending(current
);
932 spin_unlock_irqrestore(¤t
->sigmask_lock
, flags
);
939 #include <linux/nfs_fs.h>
940 void rpc_show_tasks(void)
942 struct rpc_task
*t
= all_tasks
, *next
;
943 struct nfs_wreq
*wreq
;
947 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
948 "-rpcwait -action- --exit--\n");
949 for (; t
; t
= next
) {
950 next
= t
->tk_next_task
;
951 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
952 t
->tk_pid
, t
->tk_proc
, t
->tk_flags
, t
->tk_status
,
953 t
->tk_client
, t
->tk_client
->cl_prog
,
954 t
->tk_rqstp
, t
->tk_timeout
,
955 t
->tk_rpcwait
? rpc_qname(t
->tk_rpcwait
) : " <NULL> ",
956 t
->tk_action
, t
->tk_exit
);
958 if (!(t
->tk_flags
& RPC_TASK_NFSWRITE
))
960 /* NFS write requests */
961 wreq
= (struct nfs_wreq
*) t
->tk_calldata
;
962 printk(" NFS: flgs=%08x, pid=%d, pg=%p, off=(%d, %d)\n",
963 wreq
->wb_flags
, wreq
->wb_pid
, wreq
->wb_page
,
964 wreq
->wb_offset
, wreq
->wb_bytes
);
965 printk(" name=%s/%s\n",
966 wreq
->wb_dentry
->d_parent
->d_name
.name
,
967 wreq
->wb_dentry
->d_name
.name
);