2 * linux/net/sunrpc/sched.c
4 * Scheduling for synchronous and asynchronous RPC requests.
6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
8 * TCP NFS related read + write fixes
9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
12 #include <linux/module.h>
14 #include <linux/sched.h>
15 #include <linux/interrupt.h>
16 #include <linux/slab.h>
17 #include <linux/mempool.h>
18 #include <linux/smp.h>
19 #include <linux/smp_lock.h>
20 #include <linux/spinlock.h>
21 #include <linux/suspend.h>
23 #include <linux/sunrpc/clnt.h>
24 #include <linux/sunrpc/xprt.h>
27 #define RPCDBG_FACILITY RPCDBG_SCHED
28 static int rpc_task_id
;
32 * RPC slabs and memory pools
34 #define RPC_BUFFER_MAXSIZE (2048)
35 #define RPC_BUFFER_POOLSIZE (8)
36 #define RPC_TASK_POOLSIZE (8)
37 static kmem_cache_t
*rpc_task_slabp
;
38 static kmem_cache_t
*rpc_buffer_slabp
;
39 static mempool_t
*rpc_task_mempool
;
40 static mempool_t
*rpc_buffer_mempool
;
42 static void __rpc_default_timer(struct rpc_task
*task
);
43 static void rpciod_killall(void);
46 * When an asynchronous RPC task is activated within a bottom half
47 * handler, or while executing another RPC task, it is put on
48 * schedq, and rpciod is woken up.
50 static RPC_WAITQ(schedq
, "schedq");
53 * RPC tasks that create another task (e.g. for contacting the portmapper)
54 * will wait on this queue for their child's completion
56 static RPC_WAITQ(childq
, "childq");
59 * RPC tasks sit here while waiting for conditions to improve.
61 static RPC_WAITQ(delay_queue
, "delayq");
64 * All RPC tasks are linked into this list
66 static LIST_HEAD(all_tasks
);
69 * rpciod-related stuff
71 static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle
);
72 static DECLARE_COMPLETION(rpciod_killer
);
73 static DECLARE_MUTEX(rpciod_sema
);
74 static unsigned int rpciod_users
;
75 static pid_t rpciod_pid
;
76 static int rpc_inhibit
;
79 * Spinlock for wait queues. Access to the latter also has to be
80 * interrupt-safe in order to allow timers to wake up sleeping tasks.
82 static spinlock_t rpc_queue_lock
= SPIN_LOCK_UNLOCKED
;
84 * Spinlock for other critical sections of code.
86 static spinlock_t rpc_sched_lock
= SPIN_LOCK_UNLOCKED
;
89 * Disable the timer for a given RPC task. Should be called with
90 * rpc_queue_lock and bh_disabled in order to avoid races within
94 __rpc_disable_timer(struct rpc_task
*task
)
96 dprintk("RPC: %4d disabling timer\n", task
->tk_pid
);
97 task
->tk_timeout_fn
= NULL
;
102 * Run a timeout function.
103 * We use the callback in order to allow __rpc_wake_up_task()
104 * and friends to disable the timer synchronously on SMP systems
105 * without calling del_timer_sync(). The latter could cause a
106 * deadlock if called while we're holding spinlocks...
109 rpc_run_timer(struct rpc_task
*task
)
111 void (*callback
)(struct rpc_task
*);
113 spin_lock_bh(&rpc_queue_lock
);
114 callback
= task
->tk_timeout_fn
;
115 task
->tk_timeout_fn
= NULL
;
116 spin_unlock_bh(&rpc_queue_lock
);
118 dprintk("RPC: %4d running timer\n", task
->tk_pid
);
124 * Set up a timer for the current task.
127 __rpc_add_timer(struct rpc_task
*task
, rpc_action timer
)
129 if (!task
->tk_timeout
)
132 dprintk("RPC: %4d setting alarm for %lu ms\n",
133 task
->tk_pid
, task
->tk_timeout
* 1000 / HZ
);
136 task
->tk_timeout_fn
= timer
;
138 task
->tk_timeout_fn
= __rpc_default_timer
;
139 mod_timer(&task
->tk_timer
, jiffies
+ task
->tk_timeout
);
143 * Set up a timer for an already sleeping task.
145 void rpc_add_timer(struct rpc_task
*task
, rpc_action timer
)
147 spin_lock_bh(&rpc_queue_lock
);
148 if (!RPC_IS_RUNNING(task
))
149 __rpc_add_timer(task
, timer
);
150 spin_unlock_bh(&rpc_queue_lock
);
154 * Delete any timer for the current task. Because we use del_timer_sync(),
155 * this function should never be called while holding rpc_queue_lock.
158 rpc_delete_timer(struct rpc_task
*task
)
160 if (del_timer_sync(&task
->tk_timer
))
161 dprintk("RPC: %4d deleting timer\n", task
->tk_pid
);
165 * Add new request to a priority queue.
167 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue
*queue
, struct rpc_task
*task
)
172 q
= &queue
->tasks
[task
->tk_priority
];
173 if (unlikely(task
->tk_priority
> queue
->maxpriority
))
174 q
= &queue
->tasks
[queue
->maxpriority
];
175 list_for_each_entry(t
, q
, tk_list
) {
176 if (t
->tk_cookie
== task
->tk_cookie
) {
177 list_add_tail(&task
->tk_list
, &t
->tk_links
);
181 list_add_tail(&task
->tk_list
, q
);
185 * Add new request to wait queue.
187 * Swapper tasks always get inserted at the head of the queue.
188 * This should avoid many nasty memory deadlocks and hopefully
189 * improve overall performance.
190 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
192 static int __rpc_add_wait_queue(struct rpc_wait_queue
*queue
, struct rpc_task
*task
)
194 if (task
->tk_rpcwait
== queue
)
197 if (task
->tk_rpcwait
) {
198 printk(KERN_WARNING
"RPC: doubly enqueued task!\n");
201 if (RPC_IS_PRIORITY(queue
))
202 __rpc_add_wait_queue_priority(queue
, task
);
203 else if (RPC_IS_SWAPPER(task
))
204 list_add(&task
->tk_list
, &queue
->tasks
[0]);
206 list_add_tail(&task
->tk_list
, &queue
->tasks
[0]);
207 task
->tk_rpcwait
= queue
;
209 dprintk("RPC: %4d added to queue %p \"%s\"\n",
210 task
->tk_pid
, queue
, rpc_qname(queue
));
215 int rpc_add_wait_queue(struct rpc_wait_queue
*q
, struct rpc_task
*task
)
219 spin_lock_bh(&rpc_queue_lock
);
220 result
= __rpc_add_wait_queue(q
, task
);
221 spin_unlock_bh(&rpc_queue_lock
);
226 * Remove request from a priority queue.
228 static void __rpc_remove_wait_queue_priority(struct rpc_task
*task
)
232 if (!list_empty(&task
->tk_links
)) {
233 t
= list_entry(task
->tk_links
.next
, struct rpc_task
, tk_list
);
234 list_move(&t
->tk_list
, &task
->tk_list
);
235 list_splice_init(&task
->tk_links
, &t
->tk_links
);
237 list_del(&task
->tk_list
);
241 * Remove request from queue.
242 * Note: must be called with spin lock held.
244 static void __rpc_remove_wait_queue(struct rpc_task
*task
)
246 struct rpc_wait_queue
*queue
= task
->tk_rpcwait
;
251 if (RPC_IS_PRIORITY(queue
))
252 __rpc_remove_wait_queue_priority(task
);
254 list_del(&task
->tk_list
);
255 task
->tk_rpcwait
= NULL
;
257 dprintk("RPC: %4d removed from queue %p \"%s\"\n",
258 task
->tk_pid
, queue
, rpc_qname(queue
));
262 rpc_remove_wait_queue(struct rpc_task
*task
)
264 if (!task
->tk_rpcwait
)
266 spin_lock_bh(&rpc_queue_lock
);
267 __rpc_remove_wait_queue(task
);
268 spin_unlock_bh(&rpc_queue_lock
);
271 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue
*queue
, int priority
)
273 queue
->priority
= priority
;
274 queue
->count
= 1 << (priority
* 2);
277 static inline void rpc_set_waitqueue_cookie(struct rpc_wait_queue
*queue
, unsigned long cookie
)
279 queue
->cookie
= cookie
;
280 queue
->nr
= RPC_BATCH_COUNT
;
283 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue
*queue
)
285 rpc_set_waitqueue_priority(queue
, queue
->maxpriority
);
286 rpc_set_waitqueue_cookie(queue
, 0);
289 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue
*queue
, const char *qname
, int maxprio
)
293 for (i
= 0; i
< ARRAY_SIZE(queue
->tasks
); i
++)
294 INIT_LIST_HEAD(&queue
->tasks
[i
]);
295 queue
->maxpriority
= maxprio
;
296 rpc_reset_waitqueue_priority(queue
);
302 void rpc_init_priority_wait_queue(struct rpc_wait_queue
*queue
, const char *qname
)
304 __rpc_init_priority_wait_queue(queue
, qname
, RPC_PRIORITY_HIGH
);
307 void rpc_init_wait_queue(struct rpc_wait_queue
*queue
, const char *qname
)
309 __rpc_init_priority_wait_queue(queue
, qname
, 0);
311 EXPORT_SYMBOL(rpc_init_wait_queue
);
314 * Make an RPC task runnable.
316 * Note: If the task is ASYNC, this must be called with
317 * the spinlock held to protect the wait queue operation.
320 rpc_make_runnable(struct rpc_task
*task
)
322 if (task
->tk_timeout_fn
) {
323 printk(KERN_ERR
"RPC: task w/ running timer in rpc_make_runnable!!\n");
326 rpc_set_running(task
);
327 if (RPC_IS_ASYNC(task
)) {
328 if (RPC_IS_SLEEPING(task
)) {
330 status
= __rpc_add_wait_queue(&schedq
, task
);
332 printk(KERN_WARNING
"RPC: failed to add task to queue: error: %d!\n", status
);
333 task
->tk_status
= status
;
336 rpc_clear_sleeping(task
);
337 wake_up(&rpciod_idle
);
340 rpc_clear_sleeping(task
);
341 wake_up(&task
->tk_wait
);
346 * Place a newly initialized task on the schedq.
349 rpc_schedule_run(struct rpc_task
*task
)
351 /* Don't run a child twice! */
352 if (RPC_IS_ACTIVATED(task
))
355 rpc_set_sleeping(task
);
356 rpc_make_runnable(task
);
360 * For other people who may need to wake the I/O daemon
361 * but should (for now) know nothing about its innards
363 void rpciod_wake_up(void)
366 printk(KERN_ERR
"rpciod: wot no daemon?\n");
367 wake_up(&rpciod_idle
);
371 * Prepare for sleeping on a wait queue.
372 * By always appending tasks to the list we ensure FIFO behavior.
373 * NB: An RPC task will only receive interrupt-driven events as long
374 * as it's on a wait queue.
377 __rpc_sleep_on(struct rpc_wait_queue
*q
, struct rpc_task
*task
,
378 rpc_action action
, rpc_action timer
)
382 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task
->tk_pid
,
383 rpc_qname(q
), jiffies
);
385 if (!RPC_IS_ASYNC(task
) && !RPC_IS_ACTIVATED(task
)) {
386 printk(KERN_ERR
"RPC: Inactive synchronous task put to sleep!\n");
390 /* Mark the task as being activated if so needed */
391 if (!RPC_IS_ACTIVATED(task
)) {
393 rpc_set_sleeping(task
);
396 status
= __rpc_add_wait_queue(q
, task
);
398 printk(KERN_WARNING
"RPC: failed to add task to queue: error: %d!\n", status
);
399 task
->tk_status
= status
;
401 rpc_clear_running(task
);
402 if (task
->tk_callback
) {
403 dprintk(KERN_ERR
"RPC: %4d overwrites an active callback\n", task
->tk_pid
);
406 task
->tk_callback
= action
;
407 __rpc_add_timer(task
, timer
);
412 rpc_sleep_on(struct rpc_wait_queue
*q
, struct rpc_task
*task
,
413 rpc_action action
, rpc_action timer
)
416 * Protect the queue operations.
418 spin_lock_bh(&rpc_queue_lock
);
419 __rpc_sleep_on(q
, task
, action
, timer
);
420 spin_unlock_bh(&rpc_queue_lock
);
424 * __rpc_wake_up_task - wake up a single rpc_task
425 * @task: task to be woken up
427 * Caller must hold rpc_queue_lock
430 __rpc_wake_up_task(struct rpc_task
*task
)
432 dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
433 task
->tk_pid
, jiffies
, rpc_inhibit
);
436 if (task
->tk_magic
!= 0xf00baa) {
437 printk(KERN_ERR
"RPC: attempt to wake up non-existing task!\n");
443 /* Has the task been executed yet? If not, we cannot wake it up! */
444 if (!RPC_IS_ACTIVATED(task
)) {
445 printk(KERN_ERR
"RPC: Inactive task (%p) being woken up!\n", task
);
448 if (RPC_IS_RUNNING(task
))
451 __rpc_disable_timer(task
);
452 if (task
->tk_rpcwait
!= &schedq
)
453 __rpc_remove_wait_queue(task
);
455 rpc_make_runnable(task
);
457 dprintk("RPC: __rpc_wake_up_task done\n");
461 * Default timeout handler if none specified by user
464 __rpc_default_timer(struct rpc_task
*task
)
466 dprintk("RPC: %d timeout (default timer)\n", task
->tk_pid
);
467 task
->tk_status
= -ETIMEDOUT
;
468 rpc_wake_up_task(task
);
472 * Wake up the specified task
475 rpc_wake_up_task(struct rpc_task
*task
)
477 if (RPC_IS_RUNNING(task
))
479 spin_lock_bh(&rpc_queue_lock
);
480 __rpc_wake_up_task(task
);
481 spin_unlock_bh(&rpc_queue_lock
);
485 * Wake up the next task on a priority queue.
487 static struct rpc_task
* __rpc_wake_up_next_priority(struct rpc_wait_queue
*queue
)
490 struct rpc_task
*task
;
493 * Service a batch of tasks from a single cookie.
495 q
= &queue
->tasks
[queue
->priority
];
496 if (!list_empty(q
)) {
497 task
= list_entry(q
->next
, struct rpc_task
, tk_list
);
498 if (queue
->cookie
== task
->tk_cookie
) {
501 list_move_tail(&task
->tk_list
, q
);
504 * Check if we need to switch queues.
511 * Service the next queue.
514 if (q
== &queue
->tasks
[0])
515 q
= &queue
->tasks
[queue
->maxpriority
];
518 if (!list_empty(q
)) {
519 task
= list_entry(q
->next
, struct rpc_task
, tk_list
);
522 } while (q
!= &queue
->tasks
[queue
->priority
]);
524 rpc_reset_waitqueue_priority(queue
);
528 rpc_set_waitqueue_priority(queue
, (unsigned int)(q
- &queue
->tasks
[0]));
530 rpc_set_waitqueue_cookie(queue
, task
->tk_cookie
);
532 __rpc_wake_up_task(task
);
537 * Wake up the next task on the wait queue.
539 struct rpc_task
* rpc_wake_up_next(struct rpc_wait_queue
*queue
)
541 struct rpc_task
*task
= NULL
;
543 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue
, rpc_qname(queue
));
544 spin_lock_bh(&rpc_queue_lock
);
545 if (RPC_IS_PRIORITY(queue
))
546 task
= __rpc_wake_up_next_priority(queue
);
548 task_for_first(task
, &queue
->tasks
[0])
549 __rpc_wake_up_task(task
);
551 spin_unlock_bh(&rpc_queue_lock
);
557 * rpc_wake_up - wake up all rpc_tasks
558 * @queue: rpc_wait_queue on which the tasks are sleeping
560 * Grabs rpc_queue_lock
562 void rpc_wake_up(struct rpc_wait_queue
*queue
)
564 struct rpc_task
*task
;
566 struct list_head
*head
;
567 spin_lock_bh(&rpc_queue_lock
);
568 head
= &queue
->tasks
[queue
->maxpriority
];
570 while (!list_empty(head
)) {
571 task
= list_entry(head
->next
, struct rpc_task
, tk_list
);
572 __rpc_wake_up_task(task
);
574 if (head
== &queue
->tasks
[0])
578 spin_unlock_bh(&rpc_queue_lock
);
582 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
583 * @queue: rpc_wait_queue on which the tasks are sleeping
584 * @status: status value to set
586 * Grabs rpc_queue_lock
588 void rpc_wake_up_status(struct rpc_wait_queue
*queue
, int status
)
590 struct list_head
*head
;
591 struct rpc_task
*task
;
593 spin_lock_bh(&rpc_queue_lock
);
594 head
= &queue
->tasks
[queue
->maxpriority
];
596 while (!list_empty(head
)) {
597 task
= list_entry(head
->next
, struct rpc_task
, tk_list
);
598 task
->tk_status
= status
;
599 __rpc_wake_up_task(task
);
601 if (head
== &queue
->tasks
[0])
605 spin_unlock_bh(&rpc_queue_lock
);
609 * Run a task at a later time
611 static void __rpc_atrun(struct rpc_task
*);
613 rpc_delay(struct rpc_task
*task
, unsigned long delay
)
615 task
->tk_timeout
= delay
;
616 rpc_sleep_on(&delay_queue
, task
, NULL
, __rpc_atrun
);
620 __rpc_atrun(struct rpc_task
*task
)
623 rpc_wake_up_task(task
);
627 * This is the RPC `scheduler' (or rather, the finite state machine).
630 __rpc_execute(struct rpc_task
*task
)
634 dprintk("RPC: %4d rpc_execute flgs %x\n",
635 task
->tk_pid
, task
->tk_flags
);
637 if (!RPC_IS_RUNNING(task
)) {
638 printk(KERN_WARNING
"RPC: rpc_execute called for sleeping task!!\n");
645 * Execute any pending callback.
647 if (RPC_DO_CALLBACK(task
)) {
648 /* Define a callback save pointer */
649 void (*save_callback
)(struct rpc_task
*);
652 * If a callback exists, save it, reset it,
654 * The save is needed to stop from resetting
655 * another callback set within the callback handler
658 save_callback
=task
->tk_callback
;
659 task
->tk_callback
=NULL
;
664 * Perform the next FSM step.
665 * tk_action may be NULL when the task has been killed
668 if (RPC_IS_RUNNING(task
)) {
670 * Garbage collection of pending timers...
672 rpc_delete_timer(task
);
673 if (!task
->tk_action
)
675 task
->tk_action(task
);
676 /* micro-optimization to avoid spinlock */
677 if (RPC_IS_RUNNING(task
))
682 * Check whether task is sleeping.
684 spin_lock_bh(&rpc_queue_lock
);
685 if (!RPC_IS_RUNNING(task
)) {
686 rpc_set_sleeping(task
);
687 if (RPC_IS_ASYNC(task
)) {
688 spin_unlock_bh(&rpc_queue_lock
);
692 spin_unlock_bh(&rpc_queue_lock
);
694 if (!RPC_IS_SLEEPING(task
))
696 /* sync task: sleep here */
697 dprintk("RPC: %4d sync task going to sleep\n", task
->tk_pid
);
698 if (current
->pid
== rpciod_pid
)
699 printk(KERN_ERR
"RPC: rpciod waiting on sync task!\n");
701 if (RPC_TASK_UNINTERRUPTIBLE(task
)) {
702 __wait_event(task
->tk_wait
, !RPC_IS_SLEEPING(task
));
704 __wait_event_interruptible(task
->tk_wait
, !RPC_IS_SLEEPING(task
), status
);
706 * When a sync task receives a signal, it exits with
707 * -ERESTARTSYS. In order to catch any callbacks that
708 * clean up after sleeping on some queue, we don't
709 * break the loop here, but go around once more.
711 if (status
== -ERESTARTSYS
) {
712 dprintk("RPC: %4d got signal\n", task
->tk_pid
);
713 task
->tk_flags
|= RPC_TASK_KILLED
;
714 rpc_exit(task
, -ERESTARTSYS
);
715 rpc_wake_up_task(task
);
718 dprintk("RPC: %4d sync task resuming\n", task
->tk_pid
);
723 /* If tk_action is non-null, the user wants us to restart */
724 if (task
->tk_action
) {
725 if (!RPC_ASSASSINATED(task
)) {
726 /* Release RPC slot and buffer memory */
732 printk(KERN_ERR
"RPC: dead task tries to walk away.\n");
736 dprintk("RPC: %4d exit() = %d\n", task
->tk_pid
, task
->tk_status
);
737 status
= task
->tk_status
;
739 /* Release all resources associated with the task */
740 rpc_release_task(task
);
746 * User-visible entry point to the scheduler.
748 * This may be called recursively if e.g. an async NFS task updates
749 * the attributes and finds that dirty pages must be flushed.
750 * NOTE: Upon exit of this function the task is guaranteed to be
751 * released. In particular note that tk_release() will have
752 * been called, so your task memory may have been freed.
755 rpc_execute(struct rpc_task
*task
)
759 printk(KERN_INFO
"RPC: execution inhibited!\n");
763 status
= -EWOULDBLOCK
;
764 if (task
->tk_active
) {
765 printk(KERN_ERR
"RPC: active task was run twice!\n");
770 rpc_set_running(task
);
771 return __rpc_execute(task
);
773 rpc_release_task(task
);
779 * This is our own little scheduler for async RPC tasks.
784 struct rpc_task
*task
;
787 dprintk("RPC: rpc_schedule enter\n");
790 task_for_first(task
, &schedq
.tasks
[0]) {
791 __rpc_remove_wait_queue(task
);
792 spin_unlock_bh(&rpc_queue_lock
);
795 spin_lock_bh(&rpc_queue_lock
);
800 if (++count
>= 200 || need_resched()) {
802 spin_unlock_bh(&rpc_queue_lock
);
804 spin_lock_bh(&rpc_queue_lock
);
807 dprintk("RPC: rpc_schedule leave\n");
811 * Allocate memory for RPC purposes.
813 * We try to ensure that some NFS reads and writes can always proceed
814 * by using a mempool when allocating 'small' buffers.
815 * In order to avoid memory starvation triggering more writebacks of
816 * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
819 rpc_malloc(struct rpc_task
*task
, size_t size
)
823 if (task
->tk_flags
& RPC_TASK_SWAPPER
)
828 if (size
> RPC_BUFFER_MAXSIZE
) {
829 task
->tk_buffer
= kmalloc(size
, gfp
);
831 task
->tk_bufsize
= size
;
833 task
->tk_buffer
= mempool_alloc(rpc_buffer_mempool
, gfp
);
835 task
->tk_bufsize
= RPC_BUFFER_MAXSIZE
;
837 return task
->tk_buffer
;
841 rpc_free(struct rpc_task
*task
)
843 if (task
->tk_buffer
) {
844 if (task
->tk_bufsize
== RPC_BUFFER_MAXSIZE
)
845 mempool_free(task
->tk_buffer
, rpc_buffer_mempool
);
847 kfree(task
->tk_buffer
);
848 task
->tk_buffer
= NULL
;
849 task
->tk_bufsize
= 0;
854 * Creation and deletion of RPC task structures
856 void rpc_init_task(struct rpc_task
*task
, struct rpc_clnt
*clnt
, rpc_action callback
, int flags
)
858 memset(task
, 0, sizeof(*task
));
859 init_timer(&task
->tk_timer
);
860 task
->tk_timer
.data
= (unsigned long) task
;
861 task
->tk_timer
.function
= (void (*)(unsigned long)) rpc_run_timer
;
862 task
->tk_client
= clnt
;
863 task
->tk_flags
= flags
;
864 task
->tk_exit
= callback
;
865 init_waitqueue_head(&task
->tk_wait
);
866 if (current
->uid
!= current
->fsuid
|| current
->gid
!= current
->fsgid
)
867 task
->tk_flags
|= RPC_TASK_SETUID
;
869 /* Initialize retry counters */
870 task
->tk_garb_retry
= 2;
871 task
->tk_cred_retry
= 2;
872 task
->tk_suid_retry
= 1;
874 task
->tk_priority
= RPC_PRIORITY_NORMAL
;
875 task
->tk_cookie
= (unsigned long)current
;
876 INIT_LIST_HEAD(&task
->tk_links
);
878 /* Add to global list of all tasks */
879 spin_lock(&rpc_sched_lock
);
880 list_add(&task
->tk_task
, &all_tasks
);
881 spin_unlock(&rpc_sched_lock
);
884 atomic_inc(&clnt
->cl_users
);
885 if (clnt
->cl_softrtry
)
886 task
->tk_flags
|= RPC_TASK_SOFT
;
888 task
->tk_flags
|= RPC_TASK_NOINTR
;
892 task
->tk_magic
= 0xf00baa;
893 task
->tk_pid
= rpc_task_id
++;
895 dprintk("RPC: %4d new task procpid %d\n", task
->tk_pid
,
899 static struct rpc_task
*
902 return (struct rpc_task
*)mempool_alloc(rpc_task_mempool
, GFP_NOFS
);
906 rpc_default_free_task(struct rpc_task
*task
)
908 dprintk("RPC: %4d freeing task\n", task
->tk_pid
);
909 mempool_free(task
, rpc_task_mempool
);
913 * Create a new task for the specified client. We have to
914 * clean up after an allocation failure, as the client may
915 * have specified "oneshot".
918 rpc_new_task(struct rpc_clnt
*clnt
, rpc_action callback
, int flags
)
920 struct rpc_task
*task
;
922 task
= rpc_alloc_task();
926 rpc_init_task(task
, clnt
, callback
, flags
);
928 /* Replace tk_release */
929 task
->tk_release
= rpc_default_free_task
;
931 dprintk("RPC: %4d allocated task\n", task
->tk_pid
);
932 task
->tk_flags
|= RPC_TASK_DYNAMIC
;
937 /* Check whether to release the client */
939 printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
940 atomic_read(&clnt
->cl_users
), clnt
->cl_oneshot
);
941 atomic_inc(&clnt
->cl_users
); /* pretend we were used ... */
942 rpc_release_client(clnt
);
948 rpc_release_task(struct rpc_task
*task
)
950 dprintk("RPC: %4d release task\n", task
->tk_pid
);
953 if (task
->tk_magic
!= 0xf00baa) {
954 printk(KERN_ERR
"RPC: attempt to release a non-existing task!\n");
961 /* Remove from global task list */
962 spin_lock(&rpc_sched_lock
);
963 list_del(&task
->tk_task
);
964 spin_unlock(&rpc_sched_lock
);
966 /* Protect the execution below. */
967 spin_lock_bh(&rpc_queue_lock
);
969 /* Disable timer to prevent zombie wakeup */
970 __rpc_disable_timer(task
);
972 /* Remove from any wait queue we're still on */
973 __rpc_remove_wait_queue(task
);
977 spin_unlock_bh(&rpc_queue_lock
);
979 /* Synchronously delete any running timer */
980 rpc_delete_timer(task
);
982 /* Release resources */
985 if (task
->tk_msg
.rpc_cred
)
986 rpcauth_unbindcred(task
);
988 if (task
->tk_client
) {
989 rpc_release_client(task
->tk_client
);
990 task
->tk_client
= NULL
;
996 if (task
->tk_release
)
997 task
->tk_release(task
);
1001 * rpc_find_parent - find the parent of a child task.
1002 * @child: child task
1004 * Checks that the parent task is still sleeping on the
1005 * queue 'childq'. If so returns a pointer to the parent.
1006 * Upon failure returns NULL.
1008 * Caller must hold rpc_queue_lock
1010 static inline struct rpc_task
*
1011 rpc_find_parent(struct rpc_task
*child
)
1013 struct rpc_task
*task
, *parent
;
1014 struct list_head
*le
;
1016 parent
= (struct rpc_task
*) child
->tk_calldata
;
1017 task_for_each(task
, le
, &childq
.tasks
[0])
1025 rpc_child_exit(struct rpc_task
*child
)
1027 struct rpc_task
*parent
;
1029 spin_lock_bh(&rpc_queue_lock
);
1030 if ((parent
= rpc_find_parent(child
)) != NULL
) {
1031 parent
->tk_status
= child
->tk_status
;
1032 __rpc_wake_up_task(parent
);
1034 spin_unlock_bh(&rpc_queue_lock
);
1038 * Note: rpc_new_task releases the client after a failure.
1041 rpc_new_child(struct rpc_clnt
*clnt
, struct rpc_task
*parent
)
1043 struct rpc_task
*task
;
1045 task
= rpc_new_task(clnt
, NULL
, RPC_TASK_ASYNC
| RPC_TASK_CHILD
);
1048 task
->tk_exit
= rpc_child_exit
;
1049 task
->tk_calldata
= parent
;
1053 parent
->tk_status
= -ENOMEM
;
1058 rpc_run_child(struct rpc_task
*task
, struct rpc_task
*child
, rpc_action func
)
1060 spin_lock_bh(&rpc_queue_lock
);
1061 /* N.B. Is it possible for the child to have already finished? */
1062 __rpc_sleep_on(&childq
, task
, func
, NULL
);
1063 rpc_schedule_run(child
);
1064 spin_unlock_bh(&rpc_queue_lock
);
1068 * Kill all tasks for the given client.
1069 * XXX: kill their descendants as well?
1072 rpc_killall_tasks(struct rpc_clnt
*clnt
)
1074 struct rpc_task
*rovr
;
1075 struct list_head
*le
;
1077 dprintk("RPC: killing all tasks for client %p\n", clnt
);
1080 * Spin lock all_tasks to prevent changes...
1082 spin_lock(&rpc_sched_lock
);
1083 alltask_for_each(rovr
, le
, &all_tasks
)
1084 if (!clnt
|| rovr
->tk_client
== clnt
) {
1085 rovr
->tk_flags
|= RPC_TASK_KILLED
;
1086 rpc_exit(rovr
, -EIO
);
1087 rpc_wake_up_task(rovr
);
1089 spin_unlock(&rpc_sched_lock
);
1092 static DECLARE_MUTEX_LOCKED(rpciod_running
);
1095 rpciod_task_pending(void)
1097 return !list_empty(&schedq
.tasks
[0]);
1102 * This is the rpciod kernel thread
1111 * Let our maker know we're running ...
1113 rpciod_pid
= current
->pid
;
1114 up(&rpciod_running
);
1116 daemonize("rpciod");
1117 allow_signal(SIGKILL
);
1119 dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid
);
1120 spin_lock_bh(&rpc_queue_lock
);
1121 while (rpciod_users
) {
1124 spin_unlock_bh(&rpc_queue_lock
);
1126 flush_signals(current
);
1127 spin_lock_bh(&rpc_queue_lock
);
1130 if (current
->flags
& PF_FREEZE
) {
1131 spin_unlock_bh(&rpc_queue_lock
);
1132 refrigerator(PF_FREEZE
);
1133 spin_lock_bh(&rpc_queue_lock
);
1136 if (++rounds
>= 64) { /* safeguard */
1137 spin_unlock_bh(&rpc_queue_lock
);
1140 spin_lock_bh(&rpc_queue_lock
);
1143 dprintk("RPC: rpciod back to sleep\n");
1144 prepare_to_wait(&rpciod_idle
, &wait
, TASK_INTERRUPTIBLE
);
1145 if (!rpciod_task_pending() && !signalled()) {
1146 spin_unlock_bh(&rpc_queue_lock
);
1149 spin_lock_bh(&rpc_queue_lock
);
1151 finish_wait(&rpciod_idle
, &wait
);
1152 dprintk("RPC: switch to rpciod\n");
1154 spin_unlock_bh(&rpc_queue_lock
);
1156 dprintk("RPC: rpciod shutdown commences\n");
1157 if (!list_empty(&all_tasks
)) {
1158 printk(KERN_ERR
"rpciod: active tasks at shutdown?!\n");
1162 dprintk("RPC: rpciod exiting\n");
1166 complete_and_exit(&rpciod_killer
, 0);
1171 rpciod_killall(void)
1173 unsigned long flags
;
1175 while (!list_empty(&all_tasks
)) {
1176 clear_thread_flag(TIF_SIGPENDING
);
1177 rpc_killall_tasks(NULL
);
1178 spin_lock_bh(&rpc_queue_lock
);
1180 spin_unlock_bh(&rpc_queue_lock
);
1181 if (!list_empty(&all_tasks
)) {
1182 dprintk("rpciod_killall: waiting for tasks to exit\n");
1187 spin_lock_irqsave(¤t
->sighand
->siglock
, flags
);
1188 recalc_sigpending();
1189 spin_unlock_irqrestore(¤t
->sighand
->siglock
, flags
);
1193 * Start up the rpciod process if it's not already running.
1201 dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid
, rpciod_users
);
1206 * If there's no pid, we should be the first user.
1208 if (rpciod_users
> 1)
1209 printk(KERN_WARNING
"rpciod_up: no pid, %d users??\n", rpciod_users
);
1211 * Create the rpciod thread and wait for it to start.
1213 error
= kernel_thread(rpciod
, NULL
, 0);
1215 printk(KERN_WARNING
"rpciod_up: create thread failed, error=%d\n", error
);
1219 down(&rpciod_running
);
1230 dprintk("rpciod_down pid %d sema %d\n", rpciod_pid
, rpciod_users
);
1235 printk(KERN_WARNING
"rpciod_down: pid=%d, no users??\n", rpciod_pid
);
1238 dprintk("rpciod_down: Nothing to do!\n");
1242 kill_proc(rpciod_pid
, SIGKILL
, 1);
1243 wait_for_completion(&rpciod_killer
);
1249 void rpc_show_tasks(void)
1251 struct list_head
*le
;
1254 spin_lock(&rpc_sched_lock
);
1255 if (list_empty(&all_tasks
)) {
1256 spin_unlock(&rpc_sched_lock
);
1259 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1260 "-rpcwait -action- --exit--\n");
1261 alltask_for_each(t
, le
, &all_tasks
)
1262 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
1264 (t
->tk_msg
.rpc_proc
? t
->tk_msg
.rpc_proc
->p_proc
: -1),
1265 t
->tk_flags
, t
->tk_status
,
1267 (t
->tk_client
? t
->tk_client
->cl_prog
: 0),
1268 t
->tk_rqstp
, t
->tk_timeout
,
1269 rpc_qname(t
->tk_rpcwait
),
1270 t
->tk_action
, t
->tk_exit
);
1271 spin_unlock(&rpc_sched_lock
);
1276 rpc_destroy_mempool(void)
1278 if (rpc_buffer_mempool
)
1279 mempool_destroy(rpc_buffer_mempool
);
1280 if (rpc_task_mempool
)
1281 mempool_destroy(rpc_task_mempool
);
1282 if (rpc_task_slabp
&& kmem_cache_destroy(rpc_task_slabp
))
1283 printk(KERN_INFO
"rpc_task: not all structures were freed\n");
1284 if (rpc_buffer_slabp
&& kmem_cache_destroy(rpc_buffer_slabp
))
1285 printk(KERN_INFO
"rpc_buffers: not all structures were freed\n");
1289 rpc_init_mempool(void)
1291 rpc_task_slabp
= kmem_cache_create("rpc_tasks",
1292 sizeof(struct rpc_task
),
1293 0, SLAB_HWCACHE_ALIGN
,
1295 if (!rpc_task_slabp
)
1297 rpc_buffer_slabp
= kmem_cache_create("rpc_buffers",
1299 0, SLAB_HWCACHE_ALIGN
,
1301 if (!rpc_buffer_slabp
)
1303 rpc_task_mempool
= mempool_create(RPC_TASK_POOLSIZE
,
1307 if (!rpc_task_mempool
)
1309 rpc_buffer_mempool
= mempool_create(RPC_BUFFER_POOLSIZE
,
1313 if (!rpc_buffer_mempool
)
1317 rpc_destroy_mempool();