1 /* Worker thread pool for slow items, such as filesystem lookups or mkdirs
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
12 #include <linux/module.h>
13 #include <linux/slow-work.h>
14 #include <linux/kthread.h>
15 #include <linux/freezer.h>
16 #include <linux/wait.h>
18 #define SLOW_WORK_CULL_TIMEOUT (5 * HZ) /* cull threads 5s after running out of
20 #define SLOW_WORK_OOM_TIMEOUT (5 * HZ) /* can't start new threads for 5s after
23 static void slow_work_cull_timeout(unsigned long);
24 static void slow_work_oom_timeout(unsigned long);
27 static int slow_work_min_threads_sysctl(struct ctl_table
*, int, struct file
*,
28 void __user
*, size_t *, loff_t
*);
30 static int slow_work_max_threads_sysctl(struct ctl_table
*, int , struct file
*,
31 void __user
*, size_t *, loff_t
*);
35 * The pool of threads has at least min threads in it as long as someone is
36 * using the facility, and may have as many as max.
38 * A portion of the pool may be processing very slow operations.
40 static unsigned slow_work_min_threads
= 2;
41 static unsigned slow_work_max_threads
= 4;
42 static unsigned vslow_work_proportion
= 50; /* % of threads that may process
46 static const int slow_work_min_min_threads
= 2;
47 static int slow_work_max_max_threads
= 255;
48 static const int slow_work_min_vslow
= 1;
49 static const int slow_work_max_vslow
= 99;
51 ctl_table slow_work_sysctls
[] = {
53 .ctl_name
= CTL_UNNUMBERED
,
54 .procname
= "min-threads",
55 .data
= &slow_work_min_threads
,
56 .maxlen
= sizeof(unsigned),
58 .proc_handler
= slow_work_min_threads_sysctl
,
59 .extra1
= (void *) &slow_work_min_min_threads
,
60 .extra2
= &slow_work_max_threads
,
63 .ctl_name
= CTL_UNNUMBERED
,
64 .procname
= "max-threads",
65 .data
= &slow_work_max_threads
,
66 .maxlen
= sizeof(unsigned),
68 .proc_handler
= slow_work_max_threads_sysctl
,
69 .extra1
= &slow_work_min_threads
,
70 .extra2
= (void *) &slow_work_max_max_threads
,
73 .ctl_name
= CTL_UNNUMBERED
,
74 .procname
= "vslow-percentage",
75 .data
= &vslow_work_proportion
,
76 .maxlen
= sizeof(unsigned),
78 .proc_handler
= &proc_dointvec_minmax
,
79 .extra1
= (void *) &slow_work_min_vslow
,
80 .extra2
= (void *) &slow_work_max_vslow
,
87 * The active state of the thread pool
89 static atomic_t slow_work_thread_count
;
90 static atomic_t vslow_work_executing_count
;
92 static bool slow_work_may_not_start_new_thread
;
93 static bool slow_work_cull
; /* cull a thread due to lack of activity */
94 static DEFINE_TIMER(slow_work_cull_timer
, slow_work_cull_timeout
, 0, 0);
95 static DEFINE_TIMER(slow_work_oom_timer
, slow_work_oom_timeout
, 0, 0);
96 static struct slow_work slow_work_new_thread
; /* new thread starter */
99 * The queues of work items and the lock governing access to them. These are
100 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
101 * as the number of threads bears no relation to the number of CPUs.
103 * There are two queues of work items: one for slow work items, and one for
104 * very slow work items.
106 static LIST_HEAD(slow_work_queue
);
107 static LIST_HEAD(vslow_work_queue
);
108 static DEFINE_SPINLOCK(slow_work_queue_lock
);
111 * The thread controls. A variable used to signal to the threads that they
112 * should exit when the queue is empty, a waitqueue used by the threads to wait
113 * for signals, and a completion set by the last thread to exit.
115 static bool slow_work_threads_should_exit
;
116 static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq
);
117 static DECLARE_COMPLETION(slow_work_last_thread_exited
);
120 * The number of users of the thread pool and its lock. Whilst this is zero we
121 * have no threads hanging around, and when this reaches zero, we wait for all
122 * active or queued work items to complete and kill all the threads we do have.
124 static int slow_work_user_count
;
125 static DEFINE_MUTEX(slow_work_user_lock
);
128 * Calculate the maximum number of active threads in the pool that are
129 * permitted to process very slow work items.
131 * The answer is rounded up to at least 1, but may not equal or exceed the
132 * maximum number of the threads in the pool. This means we always have at
133 * least one thread that can process slow work items, and we always have at
134 * least one thread that won't get tied up doing so.
136 static unsigned slow_work_calc_vsmax(void)
140 vsmax
= atomic_read(&slow_work_thread_count
) * vslow_work_proportion
;
142 vsmax
= max(vsmax
, 1U);
143 return min(vsmax
, slow_work_max_threads
- 1);
147 * Attempt to execute stuff queued on a slow thread. Return true if we managed
148 * it, false if there was nothing to do.
150 static bool slow_work_execute(void)
152 struct slow_work
*work
= NULL
;
156 vsmax
= slow_work_calc_vsmax();
158 /* see if we can schedule a new thread to be started if we're not
159 * keeping up with the work */
160 if (!waitqueue_active(&slow_work_thread_wq
) &&
161 (!list_empty(&slow_work_queue
) || !list_empty(&vslow_work_queue
)) &&
162 atomic_read(&slow_work_thread_count
) < slow_work_max_threads
&&
163 !slow_work_may_not_start_new_thread
)
164 slow_work_enqueue(&slow_work_new_thread
);
166 /* find something to execute */
167 spin_lock_irq(&slow_work_queue_lock
);
168 if (!list_empty(&vslow_work_queue
) &&
169 atomic_read(&vslow_work_executing_count
) < vsmax
) {
170 work
= list_entry(vslow_work_queue
.next
,
171 struct slow_work
, link
);
172 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING
, &work
->flags
))
174 list_del_init(&work
->link
);
175 atomic_inc(&vslow_work_executing_count
);
177 } else if (!list_empty(&slow_work_queue
)) {
178 work
= list_entry(slow_work_queue
.next
,
179 struct slow_work
, link
);
180 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING
, &work
->flags
))
182 list_del_init(&work
->link
);
185 very_slow
= false; /* avoid the compiler warning */
187 spin_unlock_irq(&slow_work_queue_lock
);
192 if (!test_and_clear_bit(SLOW_WORK_PENDING
, &work
->flags
))
195 work
->ops
->execute(work
);
198 atomic_dec(&vslow_work_executing_count
);
199 clear_bit_unlock(SLOW_WORK_EXECUTING
, &work
->flags
);
201 /* if someone tried to enqueue the item whilst we were executing it,
202 * then it'll be left unenqueued to avoid multiple threads trying to
203 * execute it simultaneously
205 * there is, however, a race between us testing the pending flag and
206 * getting the spinlock, and between the enqueuer setting the pending
207 * flag and getting the spinlock, so we use a deferral bit to tell us
208 * if the enqueuer got there first
210 if (test_bit(SLOW_WORK_PENDING
, &work
->flags
)) {
211 spin_lock_irq(&slow_work_queue_lock
);
213 if (!test_bit(SLOW_WORK_EXECUTING
, &work
->flags
) &&
214 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
))
217 spin_unlock_irq(&slow_work_queue_lock
);
220 work
->ops
->put_ref(work
);
224 /* we must complete the enqueue operation
225 * - we transfer our ref on the item back to the appropriate queue
226 * - don't wake another thread up as we're awake already
228 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
))
229 list_add_tail(&work
->link
, &vslow_work_queue
);
231 list_add_tail(&work
->link
, &slow_work_queue
);
232 spin_unlock_irq(&slow_work_queue_lock
);
237 * slow_work_enqueue - Schedule a slow work item for processing
238 * @work: The work item to queue
240 * Schedule a slow work item for processing. If the item is already undergoing
241 * execution, this guarantees not to re-enter the execution routine until the
242 * first execution finishes.
244 * The item is pinned by this function as it retains a reference to it, managed
245 * through the item operations. The item is unpinned once it has been
248 * An item may hog the thread that is running it for a relatively large amount
249 * of time, sufficient, for example, to perform several lookup, mkdir, create
250 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
252 * Conversely, if a number of items are awaiting processing, it may take some
253 * time before any given item is given attention. The number of threads in the
254 * pool may be increased to deal with demand, but only up to a limit.
256 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
257 * the very slow queue, from which only a portion of the threads will be
258 * allowed to pick items to execute. This ensures that very slow items won't
259 * overly block ones that are just ordinarily slow.
261 * Returns 0 if successful, -EAGAIN if not.
263 int slow_work_enqueue(struct slow_work
*work
)
267 BUG_ON(slow_work_user_count
<= 0);
270 BUG_ON(!work
->ops
->get_ref
);
272 /* when honouring an enqueue request, we only promise that we will run
273 * the work function in the future; we do not promise to run it once
274 * per enqueue request
276 * we use the PENDING bit to merge together repeat requests without
277 * having to disable IRQs and take the spinlock, whilst still
278 * maintaining our promise
280 if (!test_and_set_bit_lock(SLOW_WORK_PENDING
, &work
->flags
)) {
281 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
283 /* we promise that we will not attempt to execute the work
284 * function in more than one thread simultaneously
286 * this, however, leaves us with a problem if we're asked to
287 * enqueue the work whilst someone is executing the work
288 * function as simply queueing the work immediately means that
289 * another thread may try executing it whilst it is already
292 * to deal with this, we set the ENQ_DEFERRED bit instead of
293 * enqueueing, and the thread currently executing the work
294 * function will enqueue the work item when the work function
295 * returns and it has cleared the EXECUTING bit
297 if (test_bit(SLOW_WORK_EXECUTING
, &work
->flags
)) {
298 set_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
);
300 if (work
->ops
->get_ref(work
) < 0)
302 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
))
303 list_add_tail(&work
->link
, &vslow_work_queue
);
305 list_add_tail(&work
->link
, &slow_work_queue
);
306 wake_up(&slow_work_thread_wq
);
309 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
314 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
317 EXPORT_SYMBOL(slow_work_enqueue
);
320 * Worker thread culling algorithm
322 static bool slow_work_cull_thread(void)
325 bool do_cull
= false;
327 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
329 if (slow_work_cull
) {
330 slow_work_cull
= false;
332 if (list_empty(&slow_work_queue
) &&
333 list_empty(&vslow_work_queue
) &&
334 atomic_read(&slow_work_thread_count
) >
335 slow_work_min_threads
) {
336 mod_timer(&slow_work_cull_timer
,
337 jiffies
+ SLOW_WORK_CULL_TIMEOUT
);
342 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
347 * Determine if there is slow work available for dispatch
349 static inline bool slow_work_available(int vsmax
)
351 return !list_empty(&slow_work_queue
) ||
352 (!list_empty(&vslow_work_queue
) &&
353 atomic_read(&vslow_work_executing_count
) < vsmax
);
357 * Worker thread dispatcher
359 static int slow_work_thread(void *_data
)
366 set_user_nice(current
, -5);
369 vsmax
= vslow_work_proportion
;
370 vsmax
*= atomic_read(&slow_work_thread_count
);
373 prepare_to_wait(&slow_work_thread_wq
, &wait
,
375 if (!freezing(current
) &&
376 !slow_work_threads_should_exit
&&
377 !slow_work_available(vsmax
) &&
380 finish_wait(&slow_work_thread_wq
, &wait
);
384 vsmax
= vslow_work_proportion
;
385 vsmax
*= atomic_read(&slow_work_thread_count
);
388 if (slow_work_available(vsmax
) && slow_work_execute()) {
390 if (list_empty(&slow_work_queue
) &&
391 list_empty(&vslow_work_queue
) &&
392 atomic_read(&slow_work_thread_count
) >
393 slow_work_min_threads
)
394 mod_timer(&slow_work_cull_timer
,
395 jiffies
+ SLOW_WORK_CULL_TIMEOUT
);
399 if (slow_work_threads_should_exit
)
402 if (slow_work_cull
&& slow_work_cull_thread())
406 if (atomic_dec_and_test(&slow_work_thread_count
))
407 complete_and_exit(&slow_work_last_thread_exited
, 0);
412 * Handle thread cull timer expiration
414 static void slow_work_cull_timeout(unsigned long data
)
416 slow_work_cull
= true;
417 wake_up(&slow_work_thread_wq
);
421 * Get a reference on slow work thread starter
423 static int slow_work_new_thread_get_ref(struct slow_work
*work
)
429 * Drop a reference on slow work thread starter
431 static void slow_work_new_thread_put_ref(struct slow_work
*work
)
436 * Start a new slow work thread
438 static void slow_work_new_thread_execute(struct slow_work
*work
)
440 struct task_struct
*p
;
442 if (slow_work_threads_should_exit
)
445 if (atomic_read(&slow_work_thread_count
) >= slow_work_max_threads
)
448 if (!mutex_trylock(&slow_work_user_lock
))
451 slow_work_may_not_start_new_thread
= true;
452 atomic_inc(&slow_work_thread_count
);
453 p
= kthread_run(slow_work_thread
, NULL
, "kslowd");
455 printk(KERN_DEBUG
"Slow work thread pool: OOM\n");
456 if (atomic_dec_and_test(&slow_work_thread_count
))
457 BUG(); /* we're running on a slow work thread... */
458 mod_timer(&slow_work_oom_timer
,
459 jiffies
+ SLOW_WORK_OOM_TIMEOUT
);
461 /* ratelimit the starting of new threads */
462 mod_timer(&slow_work_oom_timer
, jiffies
+ 1);
465 mutex_unlock(&slow_work_user_lock
);
468 static const struct slow_work_ops slow_work_new_thread_ops
= {
469 .get_ref
= slow_work_new_thread_get_ref
,
470 .put_ref
= slow_work_new_thread_put_ref
,
471 .execute
= slow_work_new_thread_execute
,
475 * post-OOM new thread start suppression expiration
477 static void slow_work_oom_timeout(unsigned long data
)
479 slow_work_may_not_start_new_thread
= false;
484 * Handle adjustment of the minimum number of threads
486 static int slow_work_min_threads_sysctl(struct ctl_table
*table
, int write
,
487 struct file
*filp
, void __user
*buffer
,
488 size_t *lenp
, loff_t
*ppos
)
490 int ret
= proc_dointvec_minmax(table
, write
, filp
, buffer
, lenp
, ppos
);
494 mutex_lock(&slow_work_user_lock
);
495 if (slow_work_user_count
> 0) {
496 /* see if we need to start or stop threads */
497 n
= atomic_read(&slow_work_thread_count
) -
498 slow_work_min_threads
;
500 if (n
< 0 && !slow_work_may_not_start_new_thread
)
501 slow_work_enqueue(&slow_work_new_thread
);
503 mod_timer(&slow_work_cull_timer
,
504 jiffies
+ SLOW_WORK_CULL_TIMEOUT
);
506 mutex_unlock(&slow_work_user_lock
);
513 * Handle adjustment of the maximum number of threads
515 static int slow_work_max_threads_sysctl(struct ctl_table
*table
, int write
,
516 struct file
*filp
, void __user
*buffer
,
517 size_t *lenp
, loff_t
*ppos
)
519 int ret
= proc_dointvec_minmax(table
, write
, filp
, buffer
, lenp
, ppos
);
523 mutex_lock(&slow_work_user_lock
);
524 if (slow_work_user_count
> 0) {
525 /* see if we need to stop threads */
526 n
= slow_work_max_threads
-
527 atomic_read(&slow_work_thread_count
);
530 mod_timer(&slow_work_cull_timer
,
531 jiffies
+ SLOW_WORK_CULL_TIMEOUT
);
533 mutex_unlock(&slow_work_user_lock
);
538 #endif /* CONFIG_SYSCTL */
541 * slow_work_register_user - Register a user of the facility
543 * Register a user of the facility, starting up the initial threads if there
544 * aren't any other users at this point. This will return 0 if successful, or
547 int slow_work_register_user(void)
549 struct task_struct
*p
;
552 mutex_lock(&slow_work_user_lock
);
554 if (slow_work_user_count
== 0) {
555 printk(KERN_NOTICE
"Slow work thread pool: Starting up\n");
556 init_completion(&slow_work_last_thread_exited
);
558 slow_work_threads_should_exit
= false;
559 slow_work_init(&slow_work_new_thread
,
560 &slow_work_new_thread_ops
);
561 slow_work_may_not_start_new_thread
= false;
562 slow_work_cull
= false;
564 /* start the minimum number of threads */
565 for (loop
= 0; loop
< slow_work_min_threads
; loop
++) {
566 atomic_inc(&slow_work_thread_count
);
567 p
= kthread_run(slow_work_thread
, NULL
, "kslowd");
571 printk(KERN_NOTICE
"Slow work thread pool: Ready\n");
574 slow_work_user_count
++;
575 mutex_unlock(&slow_work_user_lock
);
579 if (atomic_dec_and_test(&slow_work_thread_count
))
580 complete(&slow_work_last_thread_exited
);
582 printk(KERN_ERR
"Slow work thread pool:"
583 " Aborting startup on ENOMEM\n");
584 slow_work_threads_should_exit
= true;
585 wake_up_all(&slow_work_thread_wq
);
586 wait_for_completion(&slow_work_last_thread_exited
);
587 printk(KERN_ERR
"Slow work thread pool: Aborted\n");
589 mutex_unlock(&slow_work_user_lock
);
592 EXPORT_SYMBOL(slow_work_register_user
);
595 * slow_work_unregister_user - Unregister a user of the facility
597 * Unregister a user of the facility, killing all the threads if this was the
600 void slow_work_unregister_user(void)
602 mutex_lock(&slow_work_user_lock
);
604 BUG_ON(slow_work_user_count
<= 0);
606 slow_work_user_count
--;
607 if (slow_work_user_count
== 0) {
608 printk(KERN_NOTICE
"Slow work thread pool: Shutting down\n");
609 slow_work_threads_should_exit
= true;
610 wake_up_all(&slow_work_thread_wq
);
611 wait_for_completion(&slow_work_last_thread_exited
);
612 printk(KERN_NOTICE
"Slow work thread pool:"
613 " Shut down complete\n");
616 del_timer_sync(&slow_work_cull_timer
);
618 mutex_unlock(&slow_work_user_lock
);
620 EXPORT_SYMBOL(slow_work_unregister_user
);
623 * Initialise the slow work facility
625 static int __init
init_slow_work(void)
627 unsigned nr_cpus
= num_possible_cpus();
629 if (slow_work_max_threads
< nr_cpus
)
630 slow_work_max_threads
= nr_cpus
;
632 if (slow_work_max_max_threads
< nr_cpus
* 2)
633 slow_work_max_max_threads
= nr_cpus
* 2;
638 subsys_initcall(init_slow_work
);