Merge with Linux 2.5.48.
[linux-2.6/linux-mips.git] / net / sunrpc / sched.c
blobc999d5b2008beda4b4aef08ae5ebdcd23a791827
1 /*
2 * linux/net/sunrpc/sched.c
4 * Scheduling for synchronous and asynchronous RPC requests.
6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7 *
8 * TCP NFS related read + write fixes
9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
12 #include <linux/module.h>
14 #define __KERNEL_SYSCALLS__
15 #include <linux/sched.h>
16 #include <linux/interrupt.h>
17 #include <linux/slab.h>
18 #include <linux/mempool.h>
19 #include <linux/unistd.h>
20 #include <linux/smp.h>
21 #include <linux/smp_lock.h>
22 #include <linux/spinlock.h>
24 #include <linux/sunrpc/clnt.h>
25 #include <linux/sunrpc/xprt.h>
27 #ifdef RPC_DEBUG
28 #define RPCDBG_FACILITY RPCDBG_SCHED
29 static int rpc_task_id;
30 #endif
33 * RPC slabs and memory pools
35 #define RPC_BUFFER_MAXSIZE (2048)
36 #define RPC_BUFFER_POOLSIZE (8)
37 #define RPC_TASK_POOLSIZE (8)
38 static kmem_cache_t *rpc_task_slabp;
39 static kmem_cache_t *rpc_buffer_slabp;
40 static mempool_t *rpc_task_mempool;
41 static mempool_t *rpc_buffer_mempool;
43 static void __rpc_default_timer(struct rpc_task *task);
44 static void rpciod_killall(void);
47 * When an asynchronous RPC task is activated within a bottom half
48 * handler, or while executing another RPC task, it is put on
49 * schedq, and rpciod is woken up.
51 static RPC_WAITQ(schedq, "schedq");
54 * RPC tasks that create another task (e.g. for contacting the portmapper)
55 * will wait on this queue for their child's completion
57 static RPC_WAITQ(childq, "childq");
60 * RPC tasks sit here while waiting for conditions to improve.
62 static RPC_WAITQ(delay_queue, "delayq");
65 * All RPC tasks are linked into this list
67 static LIST_HEAD(all_tasks);
70 * rpciod-related stuff
72 static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
73 static DECLARE_WAIT_QUEUE_HEAD(rpciod_killer);
74 static DECLARE_MUTEX(rpciod_sema);
75 static unsigned int rpciod_users;
76 static pid_t rpciod_pid;
77 static int rpc_inhibit;
80 * Spinlock for wait queues. Access to the latter also has to be
81 * interrupt-safe in order to allow timers to wake up sleeping tasks.
83 static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
85 * Spinlock for other critical sections of code.
87 static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
90 * Disable the timer for a given RPC task. Should be called with
91 * rpc_queue_lock and bh_disabled in order to avoid races within
92 * rpc_run_timer().
94 static inline void
95 __rpc_disable_timer(struct rpc_task *task)
97 dprintk("RPC: %4d disabling timer\n", task->tk_pid);
98 task->tk_timeout_fn = NULL;
99 task->tk_timeout = 0;
103 * Run a timeout function.
104 * We use the callback in order to allow __rpc_wake_up_task()
105 * and friends to disable the timer synchronously on SMP systems
106 * without calling del_timer_sync(). The latter could cause a
107 * deadlock if called while we're holding spinlocks...
109 static void
110 rpc_run_timer(struct rpc_task *task)
112 void (*callback)(struct rpc_task *);
114 spin_lock_bh(&rpc_queue_lock);
115 callback = task->tk_timeout_fn;
116 task->tk_timeout_fn = NULL;
117 spin_unlock_bh(&rpc_queue_lock);
118 if (callback) {
119 dprintk("RPC: %4d running timer\n", task->tk_pid);
120 callback(task);
125 * Set up a timer for the current task.
127 static inline void
128 __rpc_add_timer(struct rpc_task *task, rpc_action timer)
130 if (!task->tk_timeout)
131 return;
133 dprintk("RPC: %4d setting alarm for %lu ms\n",
134 task->tk_pid, task->tk_timeout * 1000 / HZ);
136 if (timer)
137 task->tk_timeout_fn = timer;
138 else
139 task->tk_timeout_fn = __rpc_default_timer;
140 mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
144 * Set up a timer for an already sleeping task.
146 void rpc_add_timer(struct rpc_task *task, rpc_action timer)
148 spin_lock_bh(&rpc_queue_lock);
149 if (!RPC_IS_RUNNING(task))
150 __rpc_add_timer(task, timer);
151 spin_unlock_bh(&rpc_queue_lock);
155 * Delete any timer for the current task. Because we use del_timer_sync(),
156 * this function should never be called while holding rpc_queue_lock.
158 static inline void
159 rpc_delete_timer(struct rpc_task *task)
161 if (timer_pending(&task->tk_timer)) {
162 dprintk("RPC: %4d deleting timer\n", task->tk_pid);
163 del_timer_sync(&task->tk_timer);
168 * Add new request to wait queue.
170 * Swapper tasks always get inserted at the head of the queue.
171 * This should avoid many nasty memory deadlocks and hopefully
172 * improve overall performance.
173 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
175 static inline int
176 __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
178 if (task->tk_rpcwait == queue)
179 return 0;
181 if (task->tk_rpcwait) {
182 printk(KERN_WARNING "RPC: doubly enqueued task!\n");
183 return -EWOULDBLOCK;
185 if (RPC_IS_SWAPPER(task))
186 list_add(&task->tk_list, &queue->tasks);
187 else
188 list_add_tail(&task->tk_list, &queue->tasks);
189 task->tk_rpcwait = queue;
191 dprintk("RPC: %4d added to queue %p \"%s\"\n",
192 task->tk_pid, queue, rpc_qname(queue));
194 return 0;
198 rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
200 int result;
202 spin_lock_bh(&rpc_queue_lock);
203 result = __rpc_add_wait_queue(q, task);
204 spin_unlock_bh(&rpc_queue_lock);
205 return result;
209 * Remove request from queue.
210 * Note: must be called with spin lock held.
212 static inline void
213 __rpc_remove_wait_queue(struct rpc_task *task)
215 struct rpc_wait_queue *queue = task->tk_rpcwait;
217 if (!queue)
218 return;
220 list_del(&task->tk_list);
221 task->tk_rpcwait = NULL;
223 dprintk("RPC: %4d removed from queue %p \"%s\"\n",
224 task->tk_pid, queue, rpc_qname(queue));
227 void
228 rpc_remove_wait_queue(struct rpc_task *task)
230 if (!task->tk_rpcwait)
231 return;
232 spin_lock_bh(&rpc_queue_lock);
233 __rpc_remove_wait_queue(task);
234 spin_unlock_bh(&rpc_queue_lock);
238 * Make an RPC task runnable.
240 * Note: If the task is ASYNC, this must be called with
241 * the spinlock held to protect the wait queue operation.
243 static inline void
244 rpc_make_runnable(struct rpc_task *task)
246 if (task->tk_timeout_fn) {
247 printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
248 return;
250 rpc_set_running(task);
251 if (RPC_IS_ASYNC(task)) {
252 if (RPC_IS_SLEEPING(task)) {
253 int status;
254 status = __rpc_add_wait_queue(&schedq, task);
255 if (status < 0) {
256 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
257 task->tk_status = status;
258 return;
260 rpc_clear_sleeping(task);
261 if (waitqueue_active(&rpciod_idle))
262 wake_up(&rpciod_idle);
264 } else {
265 rpc_clear_sleeping(task);
266 if (waitqueue_active(&task->tk_wait))
267 wake_up(&task->tk_wait);
272 * Place a newly initialized task on the schedq.
274 static inline void
275 rpc_schedule_run(struct rpc_task *task)
277 /* Don't run a child twice! */
278 if (RPC_IS_ACTIVATED(task))
279 return;
280 task->tk_active = 1;
281 rpc_set_sleeping(task);
282 rpc_make_runnable(task);
286 * For other people who may need to wake the I/O daemon
287 * but should (for now) know nothing about its innards
289 void rpciod_wake_up(void)
291 if(rpciod_pid==0)
292 printk(KERN_ERR "rpciod: wot no daemon?\n");
293 if (waitqueue_active(&rpciod_idle))
294 wake_up(&rpciod_idle);
298 * Prepare for sleeping on a wait queue.
299 * By always appending tasks to the list we ensure FIFO behavior.
300 * NB: An RPC task will only receive interrupt-driven events as long
301 * as it's on a wait queue.
303 static void
304 __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
305 rpc_action action, rpc_action timer)
307 int status;
309 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
310 rpc_qname(q), jiffies);
312 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
313 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
314 return;
317 /* Mark the task as being activated if so needed */
318 if (!RPC_IS_ACTIVATED(task)) {
319 task->tk_active = 1;
320 rpc_set_sleeping(task);
323 status = __rpc_add_wait_queue(q, task);
324 if (status) {
325 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
326 task->tk_status = status;
327 } else {
328 rpc_clear_running(task);
329 if (task->tk_callback) {
330 dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
331 BUG();
333 task->tk_callback = action;
334 __rpc_add_timer(task, timer);
338 void
339 rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
340 rpc_action action, rpc_action timer)
343 * Protect the queue operations.
345 spin_lock_bh(&rpc_queue_lock);
346 __rpc_sleep_on(q, task, action, timer);
347 spin_unlock_bh(&rpc_queue_lock);
351 * __rpc_wake_up_task - wake up a single rpc_task
352 * @task: task to be woken up
354 * Caller must hold rpc_queue_lock
356 static void
357 __rpc_wake_up_task(struct rpc_task *task)
359 dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
360 task->tk_pid, jiffies, rpc_inhibit);
362 #ifdef RPC_DEBUG
363 if (task->tk_magic != 0xf00baa) {
364 printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
365 rpc_debug = ~0;
366 rpc_show_tasks();
367 return;
369 #endif
370 /* Has the task been executed yet? If not, we cannot wake it up! */
371 if (!RPC_IS_ACTIVATED(task)) {
372 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
373 return;
375 if (RPC_IS_RUNNING(task))
376 return;
378 __rpc_disable_timer(task);
379 if (task->tk_rpcwait != &schedq)
380 __rpc_remove_wait_queue(task);
382 rpc_make_runnable(task);
384 dprintk("RPC: __rpc_wake_up_task done\n");
388 * Default timeout handler if none specified by user
390 static void
391 __rpc_default_timer(struct rpc_task *task)
393 dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
394 task->tk_status = -ETIMEDOUT;
395 rpc_wake_up_task(task);
399 * Wake up the specified task
401 void
402 rpc_wake_up_task(struct rpc_task *task)
404 if (RPC_IS_RUNNING(task))
405 return;
406 spin_lock_bh(&rpc_queue_lock);
407 __rpc_wake_up_task(task);
408 spin_unlock_bh(&rpc_queue_lock);
412 * Wake up the next task on the wait queue.
414 struct rpc_task *
415 rpc_wake_up_next(struct rpc_wait_queue *queue)
417 struct rpc_task *task = NULL;
419 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
420 spin_lock_bh(&rpc_queue_lock);
421 task_for_first(task, &queue->tasks)
422 __rpc_wake_up_task(task);
423 spin_unlock_bh(&rpc_queue_lock);
425 return task;
429 * rpc_wake_up - wake up all rpc_tasks
430 * @queue: rpc_wait_queue on which the tasks are sleeping
432 * Grabs rpc_queue_lock
434 void
435 rpc_wake_up(struct rpc_wait_queue *queue)
437 struct rpc_task *task;
439 spin_lock_bh(&rpc_queue_lock);
440 while (!list_empty(&queue->tasks))
441 task_for_first(task, &queue->tasks)
442 __rpc_wake_up_task(task);
443 spin_unlock_bh(&rpc_queue_lock);
447 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
448 * @queue: rpc_wait_queue on which the tasks are sleeping
449 * @status: status value to set
451 * Grabs rpc_queue_lock
453 void
454 rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
456 struct rpc_task *task;
458 spin_lock_bh(&rpc_queue_lock);
459 while (!list_empty(&queue->tasks)) {
460 task_for_first(task, &queue->tasks) {
461 task->tk_status = status;
462 __rpc_wake_up_task(task);
465 spin_unlock_bh(&rpc_queue_lock);
469 * Run a task at a later time
471 static void __rpc_atrun(struct rpc_task *);
472 void
473 rpc_delay(struct rpc_task *task, unsigned long delay)
475 task->tk_timeout = delay;
476 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
479 static void
480 __rpc_atrun(struct rpc_task *task)
482 task->tk_status = 0;
483 rpc_wake_up_task(task);
487 * This is the RPC `scheduler' (or rather, the finite state machine).
489 static int
490 __rpc_execute(struct rpc_task *task)
492 int status = 0;
494 dprintk("RPC: %4d rpc_execute flgs %x\n",
495 task->tk_pid, task->tk_flags);
497 if (!RPC_IS_RUNNING(task)) {
498 printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
499 return 0;
502 restarted:
503 while (1) {
505 * Execute any pending callback.
507 if (RPC_DO_CALLBACK(task)) {
508 /* Define a callback save pointer */
509 void (*save_callback)(struct rpc_task *);
512 * If a callback exists, save it, reset it,
513 * call it.
514 * The save is needed to stop from resetting
515 * another callback set within the callback handler
516 * - Dave
518 save_callback=task->tk_callback;
519 task->tk_callback=NULL;
520 save_callback(task);
524 * Perform the next FSM step.
525 * tk_action may be NULL when the task has been killed
526 * by someone else.
528 if (RPC_IS_RUNNING(task)) {
530 * Garbage collection of pending timers...
532 rpc_delete_timer(task);
533 if (!task->tk_action)
534 break;
535 task->tk_action(task);
539 * Check whether task is sleeping.
541 spin_lock_bh(&rpc_queue_lock);
542 if (!RPC_IS_RUNNING(task)) {
543 rpc_set_sleeping(task);
544 if (RPC_IS_ASYNC(task)) {
545 spin_unlock_bh(&rpc_queue_lock);
546 return 0;
549 spin_unlock_bh(&rpc_queue_lock);
551 while (RPC_IS_SLEEPING(task)) {
552 /* sync task: sleep here */
553 dprintk("RPC: %4d sync task going to sleep\n",
554 task->tk_pid);
555 if (current->pid == rpciod_pid)
556 printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
558 __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
559 dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
562 * When a sync task receives a signal, it exits with
563 * -ERESTARTSYS. In order to catch any callbacks that
564 * clean up after sleeping on some queue, we don't
565 * break the loop here, but go around once more.
567 if (task->tk_client->cl_intr && signalled()) {
568 dprintk("RPC: %4d got signal\n", task->tk_pid);
569 task->tk_flags |= RPC_TASK_KILLED;
570 rpc_exit(task, -ERESTARTSYS);
571 rpc_wake_up_task(task);
576 if (task->tk_exit) {
577 task->tk_exit(task);
578 /* If tk_action is non-null, the user wants us to restart */
579 if (task->tk_action) {
580 if (!RPC_ASSASSINATED(task)) {
581 /* Release RPC slot and buffer memory */
582 if (task->tk_rqstp)
583 xprt_release(task);
584 rpc_free(task);
585 goto restarted;
587 printk(KERN_ERR "RPC: dead task tries to walk away.\n");
591 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
592 status = task->tk_status;
594 /* Release all resources associated with the task */
595 rpc_release_task(task);
597 return status;
601 * User-visible entry point to the scheduler.
603 * This may be called recursively if e.g. an async NFS task updates
604 * the attributes and finds that dirty pages must be flushed.
605 * NOTE: Upon exit of this function the task is guaranteed to be
606 * released. In particular note that tk_release() will have
607 * been called, so your task memory may have been freed.
610 rpc_execute(struct rpc_task *task)
612 int status = -EIO;
613 if (rpc_inhibit) {
614 printk(KERN_INFO "RPC: execution inhibited!\n");
615 goto out_release;
618 status = -EWOULDBLOCK;
619 if (task->tk_active) {
620 printk(KERN_ERR "RPC: active task was run twice!\n");
621 goto out_err;
624 task->tk_active = 1;
625 rpc_set_running(task);
626 return __rpc_execute(task);
627 out_release:
628 rpc_release_task(task);
629 out_err:
630 return status;
634 * This is our own little scheduler for async RPC tasks.
636 static void
637 __rpc_schedule(void)
639 struct rpc_task *task;
640 int count = 0;
642 dprintk("RPC: rpc_schedule enter\n");
643 while (1) {
644 spin_lock_bh(&rpc_queue_lock);
646 task_for_first(task, &schedq.tasks) {
647 __rpc_remove_wait_queue(task);
648 spin_unlock_bh(&rpc_queue_lock);
650 __rpc_execute(task);
651 } else {
652 spin_unlock_bh(&rpc_queue_lock);
653 break;
656 if (++count >= 200 || need_resched()) {
657 count = 0;
658 schedule();
661 dprintk("RPC: rpc_schedule leave\n");
665 * Allocate memory for RPC purposes.
667 * We try to ensure that some NFS reads and writes can always proceed
668 * by using a mempool when allocating 'small' buffers.
669 * In order to avoid memory starvation triggering more writebacks of
670 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
672 void *
673 rpc_malloc(struct rpc_task *task, size_t size)
675 int gfp;
677 if (task->tk_flags & RPC_TASK_SWAPPER)
678 gfp = GFP_ATOMIC;
679 else
680 gfp = GFP_NOFS;
682 if (size > RPC_BUFFER_MAXSIZE) {
683 task->tk_buffer = kmalloc(size, gfp);
684 if (task->tk_buffer)
685 task->tk_bufsize = size;
686 } else {
687 task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
688 if (task->tk_buffer)
689 task->tk_bufsize = RPC_BUFFER_MAXSIZE;
691 return task->tk_buffer;
694 void
695 rpc_free(struct rpc_task *task)
697 if (task->tk_buffer) {
698 if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
699 mempool_free(task->tk_buffer, rpc_buffer_mempool);
700 else
701 kfree(task->tk_buffer);
702 task->tk_buffer = NULL;
703 task->tk_bufsize = 0;
708 * Creation and deletion of RPC task structures
710 inline void
711 rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
712 rpc_action callback, int flags)
714 memset(task, 0, sizeof(*task));
715 init_timer(&task->tk_timer);
716 task->tk_timer.data = (unsigned long) task;
717 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
718 task->tk_client = clnt;
719 task->tk_flags = flags;
720 task->tk_exit = callback;
721 init_waitqueue_head(&task->tk_wait);
722 if (current->uid != current->fsuid || current->gid != current->fsgid)
723 task->tk_flags |= RPC_TASK_SETUID;
725 /* Initialize retry counters */
726 task->tk_garb_retry = 2;
727 task->tk_cred_retry = 2;
728 task->tk_suid_retry = 1;
730 /* Add to global list of all tasks */
731 spin_lock(&rpc_sched_lock);
732 list_add(&task->tk_task, &all_tasks);
733 spin_unlock(&rpc_sched_lock);
735 if (clnt)
736 atomic_inc(&clnt->cl_users);
738 #ifdef RPC_DEBUG
739 task->tk_magic = 0xf00baa;
740 task->tk_pid = rpc_task_id++;
741 #endif
742 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
743 current->pid);
746 static struct rpc_task *
747 rpc_alloc_task(void)
749 return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
752 static void
753 rpc_default_free_task(struct rpc_task *task)
755 dprintk("RPC: %4d freeing task\n", task->tk_pid);
756 mempool_free(task, rpc_task_mempool);
760 * Create a new task for the specified client. We have to
761 * clean up after an allocation failure, as the client may
762 * have specified "oneshot".
764 struct rpc_task *
765 rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
767 struct rpc_task *task;
769 task = rpc_alloc_task();
770 if (!task)
771 goto cleanup;
773 rpc_init_task(task, clnt, callback, flags);
775 /* Replace tk_release */
776 task->tk_release = rpc_default_free_task;
778 dprintk("RPC: %4d allocated task\n", task->tk_pid);
779 task->tk_flags |= RPC_TASK_DYNAMIC;
780 out:
781 return task;
783 cleanup:
784 /* Check whether to release the client */
785 if (clnt) {
786 printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
787 atomic_read(&clnt->cl_users), clnt->cl_oneshot);
788 atomic_inc(&clnt->cl_users); /* pretend we were used ... */
789 rpc_release_client(clnt);
791 goto out;
794 void
795 rpc_release_task(struct rpc_task *task)
797 dprintk("RPC: %4d release task\n", task->tk_pid);
799 #ifdef RPC_DEBUG
800 if (task->tk_magic != 0xf00baa) {
801 printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
802 rpc_debug = ~0;
803 rpc_show_tasks();
804 return;
806 #endif
808 /* Remove from global task list */
809 spin_lock(&rpc_sched_lock);
810 list_del(&task->tk_task);
811 spin_unlock(&rpc_sched_lock);
813 /* Protect the execution below. */
814 spin_lock_bh(&rpc_queue_lock);
816 /* Disable timer to prevent zombie wakeup */
817 __rpc_disable_timer(task);
819 /* Remove from any wait queue we're still on */
820 __rpc_remove_wait_queue(task);
822 task->tk_active = 0;
824 spin_unlock_bh(&rpc_queue_lock);
826 /* Synchronously delete any running timer */
827 rpc_delete_timer(task);
829 /* Release resources */
830 if (task->tk_rqstp)
831 xprt_release(task);
832 if (task->tk_msg.rpc_cred)
833 rpcauth_unbindcred(task);
834 rpc_free(task);
835 if (task->tk_client) {
836 rpc_release_client(task->tk_client);
837 task->tk_client = NULL;
840 #ifdef RPC_DEBUG
841 task->tk_magic = 0;
842 #endif
843 if (task->tk_release)
844 task->tk_release(task);
848 * rpc_find_parent - find the parent of a child task.
849 * @child: child task
851 * Checks that the parent task is still sleeping on the
852 * queue 'childq'. If so returns a pointer to the parent.
853 * Upon failure returns NULL.
855 * Caller must hold rpc_queue_lock
857 static inline struct rpc_task *
858 rpc_find_parent(struct rpc_task *child)
860 struct rpc_task *task, *parent;
861 struct list_head *le;
863 parent = (struct rpc_task *) child->tk_calldata;
864 task_for_each(task, le, &childq.tasks)
865 if (task == parent)
866 return parent;
868 return NULL;
871 static void
872 rpc_child_exit(struct rpc_task *child)
874 struct rpc_task *parent;
876 spin_lock_bh(&rpc_queue_lock);
877 if ((parent = rpc_find_parent(child)) != NULL) {
878 parent->tk_status = child->tk_status;
879 __rpc_wake_up_task(parent);
881 spin_unlock_bh(&rpc_queue_lock);
885 * Note: rpc_new_task releases the client after a failure.
887 struct rpc_task *
888 rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
890 struct rpc_task *task;
892 task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
893 if (!task)
894 goto fail;
895 task->tk_exit = rpc_child_exit;
896 task->tk_calldata = parent;
897 return task;
899 fail:
900 parent->tk_status = -ENOMEM;
901 return NULL;
904 void
905 rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
907 spin_lock_bh(&rpc_queue_lock);
908 /* N.B. Is it possible for the child to have already finished? */
909 __rpc_sleep_on(&childq, task, func, NULL);
910 rpc_schedule_run(child);
911 spin_unlock_bh(&rpc_queue_lock);
915 * Kill all tasks for the given client.
916 * XXX: kill their descendants as well?
918 void
919 rpc_killall_tasks(struct rpc_clnt *clnt)
921 struct rpc_task *rovr;
922 struct list_head *le;
924 dprintk("RPC: killing all tasks for client %p\n", clnt);
927 * Spin lock all_tasks to prevent changes...
929 spin_lock(&rpc_sched_lock);
930 alltask_for_each(rovr, le, &all_tasks)
931 if (!clnt || rovr->tk_client == clnt) {
932 rovr->tk_flags |= RPC_TASK_KILLED;
933 rpc_exit(rovr, -EIO);
934 rpc_wake_up_task(rovr);
936 spin_unlock(&rpc_sched_lock);
939 static DECLARE_MUTEX_LOCKED(rpciod_running);
941 static inline int
942 rpciod_task_pending(void)
944 return !list_empty(&schedq.tasks);
949 * This is the rpciod kernel thread
951 static int
952 rpciod(void *ptr)
954 wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
955 int rounds = 0;
957 MOD_INC_USE_COUNT;
958 lock_kernel();
960 * Let our maker know we're running ...
962 rpciod_pid = current->pid;
963 up(&rpciod_running);
965 daemonize();
967 spin_lock_irq(&current->sig->siglock);
968 siginitsetinv(&current->blocked, sigmask(SIGKILL));
969 recalc_sigpending();
970 spin_unlock_irq(&current->sig->siglock);
972 strcpy(current->comm, "rpciod");
974 dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
975 while (rpciod_users) {
976 if (signalled()) {
977 rpciod_killall();
978 flush_signals(current);
980 __rpc_schedule();
982 if (++rounds >= 64) { /* safeguard */
983 schedule();
984 rounds = 0;
987 if (!rpciod_task_pending()) {
988 dprintk("RPC: rpciod back to sleep\n");
989 wait_event_interruptible(rpciod_idle, rpciod_task_pending());
990 dprintk("RPC: switch to rpciod\n");
991 rounds = 0;
995 dprintk("RPC: rpciod shutdown commences\n");
996 if (!list_empty(&all_tasks)) {
997 printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
998 rpciod_killall();
1001 rpciod_pid = 0;
1002 wake_up(assassin);
1004 dprintk("RPC: rpciod exiting\n");
1005 unlock_kernel();
1006 MOD_DEC_USE_COUNT;
1007 return 0;
1010 static void
1011 rpciod_killall(void)
1013 unsigned long flags;
1015 while (!list_empty(&all_tasks)) {
1016 clear_thread_flag(TIF_SIGPENDING);
1017 rpc_killall_tasks(NULL);
1018 __rpc_schedule();
1019 if (!list_empty(&all_tasks)) {
1020 dprintk("rpciod_killall: waiting for tasks to exit\n");
1021 yield();
1025 spin_lock_irqsave(&current->sig->siglock, flags);
1026 recalc_sigpending();
1027 spin_unlock_irqrestore(&current->sig->siglock, flags);
1031 * Start up the rpciod process if it's not already running.
1034 rpciod_up(void)
1036 int error = 0;
1038 MOD_INC_USE_COUNT;
1039 down(&rpciod_sema);
1040 dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
1041 rpciod_users++;
1042 if (rpciod_pid)
1043 goto out;
1045 * If there's no pid, we should be the first user.
1047 if (rpciod_users > 1)
1048 printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
1050 * Create the rpciod thread and wait for it to start.
1052 error = kernel_thread(rpciod, &rpciod_killer, 0);
1053 if (error < 0) {
1054 printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
1055 rpciod_users--;
1056 goto out;
1058 down(&rpciod_running);
1059 error = 0;
1060 out:
1061 up(&rpciod_sema);
1062 MOD_DEC_USE_COUNT;
1063 return error;
1066 void
1067 rpciod_down(void)
1069 unsigned long flags;
1071 MOD_INC_USE_COUNT;
1072 down(&rpciod_sema);
1073 dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
1074 if (rpciod_users) {
1075 if (--rpciod_users)
1076 goto out;
1077 } else
1078 printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
1080 if (!rpciod_pid) {
1081 dprintk("rpciod_down: Nothing to do!\n");
1082 goto out;
1085 kill_proc(rpciod_pid, SIGKILL, 1);
1087 * Usually rpciod will exit very quickly, so we
1088 * wait briefly before checking the process id.
1090 clear_thread_flag(TIF_SIGPENDING);
1091 yield();
1093 * Display a message if we're going to wait longer.
1095 while (rpciod_pid) {
1096 dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid);
1097 if (signalled()) {
1098 dprintk("rpciod_down: caught signal\n");
1099 break;
1101 interruptible_sleep_on(&rpciod_killer);
1103 spin_lock_irqsave(&current->sig->siglock, flags);
1104 recalc_sigpending();
1105 spin_unlock_irqrestore(&current->sig->siglock, flags);
1106 out:
1107 up(&rpciod_sema);
1108 MOD_DEC_USE_COUNT;
1111 #ifdef RPC_DEBUG
1112 void rpc_show_tasks(void)
1114 struct list_head *le;
1115 struct rpc_task *t;
1117 spin_lock(&rpc_sched_lock);
1118 if (list_empty(&all_tasks)) {
1119 spin_unlock(&rpc_sched_lock);
1120 return;
1122 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1123 "-rpcwait -action- --exit--\n");
1124 alltask_for_each(t, le, &all_tasks)
1125 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
1126 t->tk_pid, t->tk_msg.rpc_proc->p_proc,
1127 t->tk_flags, t->tk_status,
1128 t->tk_client, t->tk_client->cl_prog,
1129 t->tk_rqstp, t->tk_timeout,
1130 t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
1131 t->tk_action, t->tk_exit);
1132 spin_unlock(&rpc_sched_lock);
1134 #endif
1136 void
1137 rpc_destroy_mempool(void)
1139 if (rpc_buffer_mempool)
1140 mempool_destroy(rpc_buffer_mempool);
1141 if (rpc_task_mempool)
1142 mempool_destroy(rpc_task_mempool);
1143 if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp))
1144 printk(KERN_INFO "rpc_task: not all structures were freed\n");
1145 if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp))
1146 printk(KERN_INFO "rpc_buffers: not all structures were freed\n");
1150 rpc_init_mempool(void)
1152 rpc_task_slabp = kmem_cache_create("rpc_tasks",
1153 sizeof(struct rpc_task),
1154 0, SLAB_HWCACHE_ALIGN,
1155 NULL, NULL);
1156 if (!rpc_task_slabp)
1157 goto err_nomem;
1158 rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1159 RPC_BUFFER_MAXSIZE,
1160 0, SLAB_HWCACHE_ALIGN,
1161 NULL, NULL);
1162 if (!rpc_buffer_slabp)
1163 goto err_nomem;
1164 rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE,
1165 mempool_alloc_slab,
1166 mempool_free_slab,
1167 rpc_task_slabp);
1168 if (!rpc_task_mempool)
1169 goto err_nomem;
1170 rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE,
1171 mempool_alloc_slab,
1172 mempool_free_slab,
1173 rpc_buffer_slabp);
1174 if (!rpc_buffer_mempool)
1175 goto err_nomem;
1176 return 0;
1177 err_nomem:
1178 rpc_destroy_mempool();
1179 return -ENOMEM;