1 /* Worker thread pool for slow items, such as filesystem lookups or mkdirs
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
11 * See Documentation/slow-work.txt
14 #include <linux/module.h>
15 #include <linux/slow-work.h>
16 #include <linux/kthread.h>
17 #include <linux/freezer.h>
18 #include <linux/wait.h>
19 #include <linux/debugfs.h>
20 #include "slow-work.h"
22 static void slow_work_cull_timeout(unsigned long);
23 static void slow_work_oom_timeout(unsigned long);
26 static int slow_work_min_threads_sysctl(struct ctl_table
*, int,
27 void __user
*, size_t *, loff_t
*);
29 static int slow_work_max_threads_sysctl(struct ctl_table
*, int ,
30 void __user
*, size_t *, loff_t
*);
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
37 * A portion of the pool may be processing very slow operations.
39 static unsigned slow_work_min_threads
= 2;
40 static unsigned slow_work_max_threads
= 4;
41 static unsigned vslow_work_proportion
= 50; /* % of threads that may process
45 static const int slow_work_min_min_threads
= 2;
46 static int slow_work_max_max_threads
= SLOW_WORK_THREAD_LIMIT
;
47 static const int slow_work_min_vslow
= 1;
48 static const int slow_work_max_vslow
= 99;
50 ctl_table slow_work_sysctls
[] = {
52 .ctl_name
= CTL_UNNUMBERED
,
53 .procname
= "min-threads",
54 .data
= &slow_work_min_threads
,
55 .maxlen
= sizeof(unsigned),
57 .proc_handler
= slow_work_min_threads_sysctl
,
58 .extra1
= (void *) &slow_work_min_min_threads
,
59 .extra2
= &slow_work_max_threads
,
62 .ctl_name
= CTL_UNNUMBERED
,
63 .procname
= "max-threads",
64 .data
= &slow_work_max_threads
,
65 .maxlen
= sizeof(unsigned),
67 .proc_handler
= slow_work_max_threads_sysctl
,
68 .extra1
= &slow_work_min_threads
,
69 .extra2
= (void *) &slow_work_max_max_threads
,
72 .ctl_name
= CTL_UNNUMBERED
,
73 .procname
= "vslow-percentage",
74 .data
= &vslow_work_proportion
,
75 .maxlen
= sizeof(unsigned),
77 .proc_handler
= &proc_dointvec_minmax
,
78 .extra1
= (void *) &slow_work_min_vslow
,
79 .extra2
= (void *) &slow_work_max_vslow
,
86 * The active state of the thread pool
88 static atomic_t slow_work_thread_count
;
89 static atomic_t vslow_work_executing_count
;
91 static bool slow_work_may_not_start_new_thread
;
92 static bool slow_work_cull
; /* cull a thread due to lack of activity */
93 static DEFINE_TIMER(slow_work_cull_timer
, slow_work_cull_timeout
, 0, 0);
94 static DEFINE_TIMER(slow_work_oom_timer
, slow_work_oom_timeout
, 0, 0);
95 static struct slow_work slow_work_new_thread
; /* new thread starter */
98 * slow work ID allocation (use slow_work_queue_lock)
100 static DECLARE_BITMAP(slow_work_ids
, SLOW_WORK_THREAD_LIMIT
);
103 * Unregistration tracking to prevent put_ref() from disappearing during module
106 #ifdef CONFIG_MODULES
107 static struct module
*slow_work_thread_processing
[SLOW_WORK_THREAD_LIMIT
];
108 static struct module
*slow_work_unreg_module
;
109 static struct slow_work
*slow_work_unreg_work_item
;
110 static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq
);
111 static DEFINE_MUTEX(slow_work_unreg_sync_lock
);
113 static void slow_work_set_thread_processing(int id
, struct slow_work
*work
)
116 slow_work_thread_processing
[id
] = work
->owner
;
118 static void slow_work_done_thread_processing(int id
, struct slow_work
*work
)
120 struct module
*module
= slow_work_thread_processing
[id
];
122 slow_work_thread_processing
[id
] = NULL
;
124 if (slow_work_unreg_work_item
== work
||
125 slow_work_unreg_module
== module
)
126 wake_up_all(&slow_work_unreg_wq
);
128 static void slow_work_clear_thread_processing(int id
)
130 slow_work_thread_processing
[id
] = NULL
;
133 static void slow_work_set_thread_processing(int id
, struct slow_work
*work
) {}
134 static void slow_work_done_thread_processing(int id
, struct slow_work
*work
) {}
135 static void slow_work_clear_thread_processing(int id
) {}
139 * Data for tracking currently executing items for indication through /proc
141 #ifdef CONFIG_SLOW_WORK_DEBUG
142 struct slow_work
*slow_work_execs
[SLOW_WORK_THREAD_LIMIT
];
143 pid_t slow_work_pids
[SLOW_WORK_THREAD_LIMIT
];
144 DEFINE_RWLOCK(slow_work_execs_lock
);
148 * The queues of work items and the lock governing access to them. These are
149 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
150 * as the number of threads bears no relation to the number of CPUs.
152 * There are two queues of work items: one for slow work items, and one for
153 * very slow work items.
155 LIST_HEAD(slow_work_queue
);
156 LIST_HEAD(vslow_work_queue
);
157 DEFINE_SPINLOCK(slow_work_queue_lock
);
160 * The following are two wait queues that get pinged when a work item is placed
161 * on an empty queue. These allow work items that are hogging a thread by
162 * sleeping in a way that could be deferred to yield their thread and enqueue
165 static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation
);
166 static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation
);
169 * The thread controls. A variable used to signal to the threads that they
170 * should exit when the queue is empty, a waitqueue used by the threads to wait
171 * for signals, and a completion set by the last thread to exit.
173 static bool slow_work_threads_should_exit
;
174 static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq
);
175 static DECLARE_COMPLETION(slow_work_last_thread_exited
);
178 * The number of users of the thread pool and its lock. Whilst this is zero we
179 * have no threads hanging around, and when this reaches zero, we wait for all
180 * active or queued work items to complete and kill all the threads we do have.
182 static int slow_work_user_count
;
183 static DEFINE_MUTEX(slow_work_user_lock
);
185 static inline int slow_work_get_ref(struct slow_work
*work
)
187 if (work
->ops
->get_ref
)
188 return work
->ops
->get_ref(work
);
193 static inline void slow_work_put_ref(struct slow_work
*work
)
195 if (work
->ops
->put_ref
)
196 work
->ops
->put_ref(work
);
200 * Calculate the maximum number of active threads in the pool that are
201 * permitted to process very slow work items.
203 * The answer is rounded up to at least 1, but may not equal or exceed the
204 * maximum number of the threads in the pool. This means we always have at
205 * least one thread that can process slow work items, and we always have at
206 * least one thread that won't get tied up doing so.
208 static unsigned slow_work_calc_vsmax(void)
212 vsmax
= atomic_read(&slow_work_thread_count
) * vslow_work_proportion
;
214 vsmax
= max(vsmax
, 1U);
215 return min(vsmax
, slow_work_max_threads
- 1);
219 * Attempt to execute stuff queued on a slow thread. Return true if we managed
220 * it, false if there was nothing to do.
222 static noinline
bool slow_work_execute(int id
)
224 struct slow_work
*work
= NULL
;
228 vsmax
= slow_work_calc_vsmax();
230 /* see if we can schedule a new thread to be started if we're not
231 * keeping up with the work */
232 if (!waitqueue_active(&slow_work_thread_wq
) &&
233 (!list_empty(&slow_work_queue
) || !list_empty(&vslow_work_queue
)) &&
234 atomic_read(&slow_work_thread_count
) < slow_work_max_threads
&&
235 !slow_work_may_not_start_new_thread
)
236 slow_work_enqueue(&slow_work_new_thread
);
238 /* find something to execute */
239 spin_lock_irq(&slow_work_queue_lock
);
240 if (!list_empty(&vslow_work_queue
) &&
241 atomic_read(&vslow_work_executing_count
) < vsmax
) {
242 work
= list_entry(vslow_work_queue
.next
,
243 struct slow_work
, link
);
244 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING
, &work
->flags
))
246 list_del_init(&work
->link
);
247 atomic_inc(&vslow_work_executing_count
);
249 } else if (!list_empty(&slow_work_queue
)) {
250 work
= list_entry(slow_work_queue
.next
,
251 struct slow_work
, link
);
252 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING
, &work
->flags
))
254 list_del_init(&work
->link
);
257 very_slow
= false; /* avoid the compiler warning */
260 slow_work_set_thread_processing(id
, work
);
262 slow_work_mark_time(work
);
263 slow_work_begin_exec(id
, work
);
266 spin_unlock_irq(&slow_work_queue_lock
);
271 if (!test_and_clear_bit(SLOW_WORK_PENDING
, &work
->flags
))
274 /* don't execute if the work is in the process of being cancelled */
275 if (!test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
276 work
->ops
->execute(work
);
279 atomic_dec(&vslow_work_executing_count
);
280 clear_bit_unlock(SLOW_WORK_EXECUTING
, &work
->flags
);
282 /* wake up anyone waiting for this work to be complete */
283 wake_up_bit(&work
->flags
, SLOW_WORK_EXECUTING
);
285 slow_work_end_exec(id
, work
);
287 /* if someone tried to enqueue the item whilst we were executing it,
288 * then it'll be left unenqueued to avoid multiple threads trying to
289 * execute it simultaneously
291 * there is, however, a race between us testing the pending flag and
292 * getting the spinlock, and between the enqueuer setting the pending
293 * flag and getting the spinlock, so we use a deferral bit to tell us
294 * if the enqueuer got there first
296 if (test_bit(SLOW_WORK_PENDING
, &work
->flags
)) {
297 spin_lock_irq(&slow_work_queue_lock
);
299 if (!test_bit(SLOW_WORK_EXECUTING
, &work
->flags
) &&
300 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
))
303 spin_unlock_irq(&slow_work_queue_lock
);
306 /* sort out the race between module unloading and put_ref() */
307 slow_work_put_ref(work
);
308 slow_work_done_thread_processing(id
, work
);
313 /* we must complete the enqueue operation
314 * - we transfer our ref on the item back to the appropriate queue
315 * - don't wake another thread up as we're awake already
317 slow_work_mark_time(work
);
318 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
))
319 list_add_tail(&work
->link
, &vslow_work_queue
);
321 list_add_tail(&work
->link
, &slow_work_queue
);
322 spin_unlock_irq(&slow_work_queue_lock
);
323 slow_work_clear_thread_processing(id
);
328 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
329 * work: The work item under execution that wants to sleep
330 * _timeout: Scheduler sleep timeout
332 * Allow a requeueable work item to sleep on a slow-work processor thread until
333 * that thread is needed to do some other work or the sleep is interrupted by
336 * The caller must set up a wake up event before calling this and must have set
337 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
338 * condition before calling this function as no test is made here.
340 * False is returned if there is nothing on the queue; true is returned if the
341 * work item should be requeued
343 bool slow_work_sleep_till_thread_needed(struct slow_work
*work
,
344 signed long *_timeout
)
346 wait_queue_head_t
*wfo_wq
;
347 struct list_head
*queue
;
351 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
)) {
352 wfo_wq
= &vslow_work_queue_waits_for_occupation
;
353 queue
= &vslow_work_queue
;
355 wfo_wq
= &slow_work_queue_waits_for_occupation
;
356 queue
= &slow_work_queue
;
359 if (!list_empty(queue
))
362 add_wait_queue_exclusive(wfo_wq
, &wait
);
363 if (list_empty(queue
))
364 *_timeout
= schedule_timeout(*_timeout
);
365 finish_wait(wfo_wq
, &wait
);
367 return !list_empty(queue
);
369 EXPORT_SYMBOL(slow_work_sleep_till_thread_needed
);
372 * slow_work_enqueue - Schedule a slow work item for processing
373 * @work: The work item to queue
375 * Schedule a slow work item for processing. If the item is already undergoing
376 * execution, this guarantees not to re-enter the execution routine until the
377 * first execution finishes.
379 * The item is pinned by this function as it retains a reference to it, managed
380 * through the item operations. The item is unpinned once it has been
383 * An item may hog the thread that is running it for a relatively large amount
384 * of time, sufficient, for example, to perform several lookup, mkdir, create
385 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
387 * Conversely, if a number of items are awaiting processing, it may take some
388 * time before any given item is given attention. The number of threads in the
389 * pool may be increased to deal with demand, but only up to a limit.
391 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
392 * the very slow queue, from which only a portion of the threads will be
393 * allowed to pick items to execute. This ensures that very slow items won't
394 * overly block ones that are just ordinarily slow.
396 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
399 int slow_work_enqueue(struct slow_work
*work
)
401 wait_queue_head_t
*wfo_wq
;
402 struct list_head
*queue
;
406 if (test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
409 BUG_ON(slow_work_user_count
<= 0);
413 /* when honouring an enqueue request, we only promise that we will run
414 * the work function in the future; we do not promise to run it once
415 * per enqueue request
417 * we use the PENDING bit to merge together repeat requests without
418 * having to disable IRQs and take the spinlock, whilst still
419 * maintaining our promise
421 if (!test_and_set_bit_lock(SLOW_WORK_PENDING
, &work
->flags
)) {
422 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
)) {
423 wfo_wq
= &vslow_work_queue_waits_for_occupation
;
424 queue
= &vslow_work_queue
;
426 wfo_wq
= &slow_work_queue_waits_for_occupation
;
427 queue
= &slow_work_queue
;
430 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
432 if (unlikely(test_bit(SLOW_WORK_CANCELLING
, &work
->flags
)))
435 /* we promise that we will not attempt to execute the work
436 * function in more than one thread simultaneously
438 * this, however, leaves us with a problem if we're asked to
439 * enqueue the work whilst someone is executing the work
440 * function as simply queueing the work immediately means that
441 * another thread may try executing it whilst it is already
444 * to deal with this, we set the ENQ_DEFERRED bit instead of
445 * enqueueing, and the thread currently executing the work
446 * function will enqueue the work item when the work function
447 * returns and it has cleared the EXECUTING bit
449 if (test_bit(SLOW_WORK_EXECUTING
, &work
->flags
)) {
450 set_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
);
452 ret
= slow_work_get_ref(work
);
455 slow_work_mark_time(work
);
456 list_add_tail(&work
->link
, queue
);
457 wake_up(&slow_work_thread_wq
);
459 /* if someone who could be requeued is sleeping on a
460 * thread, then ask them to yield their thread */
461 if (work
->link
.prev
== queue
)
465 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
472 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
475 EXPORT_SYMBOL(slow_work_enqueue
);
477 static int slow_work_wait(void *word
)
484 * slow_work_cancel - Cancel a slow work item
485 * @work: The work item to cancel
487 * This function will cancel a previously enqueued work item. If we cannot
488 * cancel the work item, it is guarenteed to have run when this function
491 void slow_work_cancel(struct slow_work
*work
)
493 bool wait
= true, put
= false;
495 set_bit(SLOW_WORK_CANCELLING
, &work
->flags
);
498 /* if the work item is a delayed work item with an active timer, we
499 * need to wait for the timer to finish _before_ getting the spinlock,
500 * lest we deadlock against the timer routine
502 * the timer routine will leave DELAYED set if it notices the
503 * CANCELLING flag in time
505 if (test_bit(SLOW_WORK_DELAYED
, &work
->flags
)) {
506 struct delayed_slow_work
*dwork
=
507 container_of(work
, struct delayed_slow_work
, work
);
508 del_timer_sync(&dwork
->timer
);
511 spin_lock_irq(&slow_work_queue_lock
);
513 if (test_bit(SLOW_WORK_DELAYED
, &work
->flags
)) {
514 /* the timer routine aborted or never happened, so we are left
515 * holding the timer's reference on the item and should just
516 * drop the pending flag and wait for any ongoing execution to
518 struct delayed_slow_work
*dwork
=
519 container_of(work
, struct delayed_slow_work
, work
);
521 BUG_ON(timer_pending(&dwork
->timer
));
522 BUG_ON(!list_empty(&work
->link
));
524 clear_bit(SLOW_WORK_DELAYED
, &work
->flags
);
526 clear_bit(SLOW_WORK_PENDING
, &work
->flags
);
528 } else if (test_bit(SLOW_WORK_PENDING
, &work
->flags
) &&
529 !list_empty(&work
->link
)) {
530 /* the link in the pending queue holds a reference on the item
531 * that we will need to release */
532 list_del_init(&work
->link
);
535 clear_bit(SLOW_WORK_PENDING
, &work
->flags
);
537 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
)) {
538 /* the executor is holding our only reference on the item, so
539 * we merely need to wait for it to finish executing */
540 clear_bit(SLOW_WORK_PENDING
, &work
->flags
);
543 spin_unlock_irq(&slow_work_queue_lock
);
545 /* the EXECUTING flag is set by the executor whilst the spinlock is set
546 * and before the item is dequeued - so assuming the above doesn't
547 * actually dequeue it, simply waiting for the EXECUTING flag to be
548 * released here should be sufficient */
550 wait_on_bit(&work
->flags
, SLOW_WORK_EXECUTING
, slow_work_wait
,
551 TASK_UNINTERRUPTIBLE
);
553 clear_bit(SLOW_WORK_CANCELLING
, &work
->flags
);
555 slow_work_put_ref(work
);
557 EXPORT_SYMBOL(slow_work_cancel
);
560 * Handle expiry of the delay timer, indicating that a delayed slow work item
561 * should now be queued if not cancelled
563 static void delayed_slow_work_timer(unsigned long data
)
565 wait_queue_head_t
*wfo_wq
;
566 struct list_head
*queue
;
567 struct slow_work
*work
= (struct slow_work
*) data
;
569 bool queued
= false, put
= false, first
= false;
571 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
)) {
572 wfo_wq
= &vslow_work_queue_waits_for_occupation
;
573 queue
= &vslow_work_queue
;
575 wfo_wq
= &slow_work_queue_waits_for_occupation
;
576 queue
= &slow_work_queue
;
579 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
580 if (likely(!test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))) {
581 clear_bit(SLOW_WORK_DELAYED
, &work
->flags
);
583 if (test_bit(SLOW_WORK_EXECUTING
, &work
->flags
)) {
584 /* we discard the reference the timer was holding in
585 * favour of the one the executor holds */
586 set_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
);
589 slow_work_mark_time(work
);
590 list_add_tail(&work
->link
, queue
);
592 if (work
->link
.prev
== queue
)
597 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
599 slow_work_put_ref(work
);
603 wake_up(&slow_work_thread_wq
);
607 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
608 * @dwork: The delayed work item to queue
609 * @delay: When to start executing the work, in jiffies from now
611 * This is similar to slow_work_enqueue(), but it adds a delay before the work
612 * is actually queued for processing.
614 * The item can have delayed processing requested on it whilst it is being
615 * executed. The delay will begin immediately, and if it expires before the
616 * item finishes executing, the item will be placed back on the queue when it
617 * has done executing.
619 int delayed_slow_work_enqueue(struct delayed_slow_work
*dwork
,
622 struct slow_work
*work
= &dwork
->work
;
627 return slow_work_enqueue(&dwork
->work
);
629 BUG_ON(slow_work_user_count
<= 0);
633 if (test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
636 if (!test_and_set_bit_lock(SLOW_WORK_PENDING
, &work
->flags
)) {
637 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
639 if (test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
642 /* the timer holds a reference whilst it is pending */
643 ret
= work
->ops
->get_ref(work
);
647 if (test_and_set_bit(SLOW_WORK_DELAYED
, &work
->flags
))
649 dwork
->timer
.expires
= jiffies
+ delay
;
650 dwork
->timer
.data
= (unsigned long) work
;
651 dwork
->timer
.function
= delayed_slow_work_timer
;
652 add_timer(&dwork
->timer
);
654 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
662 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
665 EXPORT_SYMBOL(delayed_slow_work_enqueue
);
668 * Schedule a cull of the thread pool at some time in the near future
670 static void slow_work_schedule_cull(void)
672 mod_timer(&slow_work_cull_timer
,
673 round_jiffies(jiffies
+ SLOW_WORK_CULL_TIMEOUT
));
677 * Worker thread culling algorithm
679 static bool slow_work_cull_thread(void)
682 bool do_cull
= false;
684 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
686 if (slow_work_cull
) {
687 slow_work_cull
= false;
689 if (list_empty(&slow_work_queue
) &&
690 list_empty(&vslow_work_queue
) &&
691 atomic_read(&slow_work_thread_count
) >
692 slow_work_min_threads
) {
693 slow_work_schedule_cull();
698 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
703 * Determine if there is slow work available for dispatch
705 static inline bool slow_work_available(int vsmax
)
707 return !list_empty(&slow_work_queue
) ||
708 (!list_empty(&vslow_work_queue
) &&
709 atomic_read(&vslow_work_executing_count
) < vsmax
);
713 * Worker thread dispatcher
715 static int slow_work_thread(void *_data
)
722 set_user_nice(current
, -5);
724 /* allocate ourselves an ID */
725 spin_lock_irq(&slow_work_queue_lock
);
726 id
= find_first_zero_bit(slow_work_ids
, SLOW_WORK_THREAD_LIMIT
);
727 BUG_ON(id
< 0 || id
>= SLOW_WORK_THREAD_LIMIT
);
728 __set_bit(id
, slow_work_ids
);
729 slow_work_set_thread_pid(id
, current
->pid
);
730 spin_unlock_irq(&slow_work_queue_lock
);
732 sprintf(current
->comm
, "kslowd%03u", id
);
735 vsmax
= vslow_work_proportion
;
736 vsmax
*= atomic_read(&slow_work_thread_count
);
739 prepare_to_wait_exclusive(&slow_work_thread_wq
, &wait
,
741 if (!freezing(current
) &&
742 !slow_work_threads_should_exit
&&
743 !slow_work_available(vsmax
) &&
746 finish_wait(&slow_work_thread_wq
, &wait
);
750 vsmax
= vslow_work_proportion
;
751 vsmax
*= atomic_read(&slow_work_thread_count
);
754 if (slow_work_available(vsmax
) && slow_work_execute(id
)) {
756 if (list_empty(&slow_work_queue
) &&
757 list_empty(&vslow_work_queue
) &&
758 atomic_read(&slow_work_thread_count
) >
759 slow_work_min_threads
)
760 slow_work_schedule_cull();
764 if (slow_work_threads_should_exit
)
767 if (slow_work_cull
&& slow_work_cull_thread())
771 spin_lock_irq(&slow_work_queue_lock
);
772 slow_work_set_thread_pid(id
, 0);
773 __clear_bit(id
, slow_work_ids
);
774 spin_unlock_irq(&slow_work_queue_lock
);
776 if (atomic_dec_and_test(&slow_work_thread_count
))
777 complete_and_exit(&slow_work_last_thread_exited
, 0);
782 * Handle thread cull timer expiration
784 static void slow_work_cull_timeout(unsigned long data
)
786 slow_work_cull
= true;
787 wake_up(&slow_work_thread_wq
);
791 * Start a new slow work thread
793 static void slow_work_new_thread_execute(struct slow_work
*work
)
795 struct task_struct
*p
;
797 if (slow_work_threads_should_exit
)
800 if (atomic_read(&slow_work_thread_count
) >= slow_work_max_threads
)
803 if (!mutex_trylock(&slow_work_user_lock
))
806 slow_work_may_not_start_new_thread
= true;
807 atomic_inc(&slow_work_thread_count
);
808 p
= kthread_run(slow_work_thread
, NULL
, "kslowd");
810 printk(KERN_DEBUG
"Slow work thread pool: OOM\n");
811 if (atomic_dec_and_test(&slow_work_thread_count
))
812 BUG(); /* we're running on a slow work thread... */
813 mod_timer(&slow_work_oom_timer
,
814 round_jiffies(jiffies
+ SLOW_WORK_OOM_TIMEOUT
));
816 /* ratelimit the starting of new threads */
817 mod_timer(&slow_work_oom_timer
, jiffies
+ 1);
820 mutex_unlock(&slow_work_user_lock
);
823 static const struct slow_work_ops slow_work_new_thread_ops
= {
824 .owner
= THIS_MODULE
,
825 .execute
= slow_work_new_thread_execute
,
826 #ifdef CONFIG_SLOW_WORK_DEBUG
827 .desc
= slow_work_new_thread_desc
,
832 * post-OOM new thread start suppression expiration
834 static void slow_work_oom_timeout(unsigned long data
)
836 slow_work_may_not_start_new_thread
= false;
841 * Handle adjustment of the minimum number of threads
843 static int slow_work_min_threads_sysctl(struct ctl_table
*table
, int write
,
845 size_t *lenp
, loff_t
*ppos
)
847 int ret
= proc_dointvec_minmax(table
, write
, buffer
, lenp
, ppos
);
851 mutex_lock(&slow_work_user_lock
);
852 if (slow_work_user_count
> 0) {
853 /* see if we need to start or stop threads */
854 n
= atomic_read(&slow_work_thread_count
) -
855 slow_work_min_threads
;
857 if (n
< 0 && !slow_work_may_not_start_new_thread
)
858 slow_work_enqueue(&slow_work_new_thread
);
860 slow_work_schedule_cull();
862 mutex_unlock(&slow_work_user_lock
);
869 * Handle adjustment of the maximum number of threads
871 static int slow_work_max_threads_sysctl(struct ctl_table
*table
, int write
,
873 size_t *lenp
, loff_t
*ppos
)
875 int ret
= proc_dointvec_minmax(table
, write
, buffer
, lenp
, ppos
);
879 mutex_lock(&slow_work_user_lock
);
880 if (slow_work_user_count
> 0) {
881 /* see if we need to stop threads */
882 n
= slow_work_max_threads
-
883 atomic_read(&slow_work_thread_count
);
886 slow_work_schedule_cull();
888 mutex_unlock(&slow_work_user_lock
);
893 #endif /* CONFIG_SYSCTL */
896 * slow_work_register_user - Register a user of the facility
897 * @module: The module about to make use of the facility
899 * Register a user of the facility, starting up the initial threads if there
900 * aren't any other users at this point. This will return 0 if successful, or
903 int slow_work_register_user(struct module
*module
)
905 struct task_struct
*p
;
908 mutex_lock(&slow_work_user_lock
);
910 if (slow_work_user_count
== 0) {
911 printk(KERN_NOTICE
"Slow work thread pool: Starting up\n");
912 init_completion(&slow_work_last_thread_exited
);
914 slow_work_threads_should_exit
= false;
915 slow_work_init(&slow_work_new_thread
,
916 &slow_work_new_thread_ops
);
917 slow_work_may_not_start_new_thread
= false;
918 slow_work_cull
= false;
920 /* start the minimum number of threads */
921 for (loop
= 0; loop
< slow_work_min_threads
; loop
++) {
922 atomic_inc(&slow_work_thread_count
);
923 p
= kthread_run(slow_work_thread
, NULL
, "kslowd");
927 printk(KERN_NOTICE
"Slow work thread pool: Ready\n");
930 slow_work_user_count
++;
931 mutex_unlock(&slow_work_user_lock
);
935 if (atomic_dec_and_test(&slow_work_thread_count
))
936 complete(&slow_work_last_thread_exited
);
938 printk(KERN_ERR
"Slow work thread pool:"
939 " Aborting startup on ENOMEM\n");
940 slow_work_threads_should_exit
= true;
941 wake_up_all(&slow_work_thread_wq
);
942 wait_for_completion(&slow_work_last_thread_exited
);
943 printk(KERN_ERR
"Slow work thread pool: Aborted\n");
945 mutex_unlock(&slow_work_user_lock
);
948 EXPORT_SYMBOL(slow_work_register_user
);
951 * wait for all outstanding items from the calling module to complete
952 * - note that more items may be queued whilst we're waiting
954 static void slow_work_wait_for_items(struct module
*module
)
956 #ifdef CONFIG_MODULES
957 DECLARE_WAITQUEUE(myself
, current
);
958 struct slow_work
*work
;
961 mutex_lock(&slow_work_unreg_sync_lock
);
962 add_wait_queue(&slow_work_unreg_wq
, &myself
);
965 spin_lock_irq(&slow_work_queue_lock
);
967 /* first of all, we wait for the last queued item in each list
969 list_for_each_entry_reverse(work
, &vslow_work_queue
, link
) {
970 if (work
->owner
== module
) {
971 set_current_state(TASK_UNINTERRUPTIBLE
);
972 slow_work_unreg_work_item
= work
;
976 list_for_each_entry_reverse(work
, &slow_work_queue
, link
) {
977 if (work
->owner
== module
) {
978 set_current_state(TASK_UNINTERRUPTIBLE
);
979 slow_work_unreg_work_item
= work
;
984 /* then we wait for the items being processed to finish */
985 slow_work_unreg_module
= module
;
987 for (loop
= 0; loop
< SLOW_WORK_THREAD_LIMIT
; loop
++) {
988 if (slow_work_thread_processing
[loop
] == module
)
991 spin_unlock_irq(&slow_work_queue_lock
);
992 break; /* okay, we're done */
995 spin_unlock_irq(&slow_work_queue_lock
);
997 slow_work_unreg_work_item
= NULL
;
998 slow_work_unreg_module
= NULL
;
1001 remove_wait_queue(&slow_work_unreg_wq
, &myself
);
1002 mutex_unlock(&slow_work_unreg_sync_lock
);
1003 #endif /* CONFIG_MODULES */
1007 * slow_work_unregister_user - Unregister a user of the facility
1008 * @module: The module whose items should be cleared
1010 * Unregister a user of the facility, killing all the threads if this was the
1013 * This waits for all the work items belonging to the nominated module to go
1014 * away before proceeding.
1016 void slow_work_unregister_user(struct module
*module
)
1018 /* first of all, wait for all outstanding items from the calling module
1021 slow_work_wait_for_items(module
);
1023 /* then we can actually go about shutting down the facility if need
1025 mutex_lock(&slow_work_user_lock
);
1027 BUG_ON(slow_work_user_count
<= 0);
1029 slow_work_user_count
--;
1030 if (slow_work_user_count
== 0) {
1031 printk(KERN_NOTICE
"Slow work thread pool: Shutting down\n");
1032 slow_work_threads_should_exit
= true;
1033 del_timer_sync(&slow_work_cull_timer
);
1034 del_timer_sync(&slow_work_oom_timer
);
1035 wake_up_all(&slow_work_thread_wq
);
1036 wait_for_completion(&slow_work_last_thread_exited
);
1037 printk(KERN_NOTICE
"Slow work thread pool:"
1038 " Shut down complete\n");
1041 mutex_unlock(&slow_work_user_lock
);
1043 EXPORT_SYMBOL(slow_work_unregister_user
);
1046 * Initialise the slow work facility
1048 static int __init
init_slow_work(void)
1050 unsigned nr_cpus
= num_possible_cpus();
1052 if (slow_work_max_threads
< nr_cpus
)
1053 slow_work_max_threads
= nr_cpus
;
1054 #ifdef CONFIG_SYSCTL
1055 if (slow_work_max_max_threads
< nr_cpus
* 2)
1056 slow_work_max_max_threads
= nr_cpus
* 2;
1058 #ifdef CONFIG_SLOW_WORK_DEBUG
1060 struct dentry
*dbdir
;
1062 dbdir
= debugfs_create_dir("slow_work", NULL
);
1063 if (dbdir
&& !IS_ERR(dbdir
))
1064 debugfs_create_file("runqueue", S_IFREG
| 0400, dbdir
,
1065 NULL
, &slow_work_runqueue_fops
);
1071 subsys_initcall(init_slow_work
);