1 /* Worker thread pool for slow items, such as filesystem lookups or mkdirs
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
11 * See Documentation/slow-work.txt
14 #include <linux/module.h>
15 #include <linux/slow-work.h>
16 #include <linux/kthread.h>
17 #include <linux/freezer.h>
18 #include <linux/wait.h>
19 #include <linux/debugfs.h>
20 #include "slow-work.h"
22 static void slow_work_cull_timeout(unsigned long);
23 static void slow_work_oom_timeout(unsigned long);
26 static int slow_work_min_threads_sysctl(struct ctl_table
*, int,
27 void __user
*, size_t *, loff_t
*);
29 static int slow_work_max_threads_sysctl(struct ctl_table
*, int ,
30 void __user
*, size_t *, loff_t
*);
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
37 * A portion of the pool may be processing very slow operations.
39 static unsigned slow_work_min_threads
= 2;
40 static unsigned slow_work_max_threads
= 4;
41 static unsigned vslow_work_proportion
= 50; /* % of threads that may process
45 static const int slow_work_min_min_threads
= 2;
46 static int slow_work_max_max_threads
= SLOW_WORK_THREAD_LIMIT
;
47 static const int slow_work_min_vslow
= 1;
48 static const int slow_work_max_vslow
= 99;
50 ctl_table slow_work_sysctls
[] = {
52 .procname
= "min-threads",
53 .data
= &slow_work_min_threads
,
54 .maxlen
= sizeof(unsigned),
56 .proc_handler
= slow_work_min_threads_sysctl
,
57 .extra1
= (void *) &slow_work_min_min_threads
,
58 .extra2
= &slow_work_max_threads
,
61 .procname
= "max-threads",
62 .data
= &slow_work_max_threads
,
63 .maxlen
= sizeof(unsigned),
65 .proc_handler
= slow_work_max_threads_sysctl
,
66 .extra1
= &slow_work_min_threads
,
67 .extra2
= (void *) &slow_work_max_max_threads
,
70 .procname
= "vslow-percentage",
71 .data
= &vslow_work_proportion
,
72 .maxlen
= sizeof(unsigned),
74 .proc_handler
= proc_dointvec_minmax
,
75 .extra1
= (void *) &slow_work_min_vslow
,
76 .extra2
= (void *) &slow_work_max_vslow
,
83 * The active state of the thread pool
85 static atomic_t slow_work_thread_count
;
86 static atomic_t vslow_work_executing_count
;
88 static bool slow_work_may_not_start_new_thread
;
89 static bool slow_work_cull
; /* cull a thread due to lack of activity */
90 static DEFINE_TIMER(slow_work_cull_timer
, slow_work_cull_timeout
, 0, 0);
91 static DEFINE_TIMER(slow_work_oom_timer
, slow_work_oom_timeout
, 0, 0);
92 static struct slow_work slow_work_new_thread
; /* new thread starter */
95 * slow work ID allocation (use slow_work_queue_lock)
97 static DECLARE_BITMAP(slow_work_ids
, SLOW_WORK_THREAD_LIMIT
);
100 * Unregistration tracking to prevent put_ref() from disappearing during module
103 #ifdef CONFIG_MODULES
104 static struct module
*slow_work_thread_processing
[SLOW_WORK_THREAD_LIMIT
];
105 static struct module
*slow_work_unreg_module
;
106 static struct slow_work
*slow_work_unreg_work_item
;
107 static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq
);
108 static DEFINE_MUTEX(slow_work_unreg_sync_lock
);
110 static void slow_work_set_thread_processing(int id
, struct slow_work
*work
)
113 slow_work_thread_processing
[id
] = work
->owner
;
115 static void slow_work_done_thread_processing(int id
, struct slow_work
*work
)
117 struct module
*module
= slow_work_thread_processing
[id
];
119 slow_work_thread_processing
[id
] = NULL
;
121 if (slow_work_unreg_work_item
== work
||
122 slow_work_unreg_module
== module
)
123 wake_up_all(&slow_work_unreg_wq
);
125 static void slow_work_clear_thread_processing(int id
)
127 slow_work_thread_processing
[id
] = NULL
;
130 static void slow_work_set_thread_processing(int id
, struct slow_work
*work
) {}
131 static void slow_work_done_thread_processing(int id
, struct slow_work
*work
) {}
132 static void slow_work_clear_thread_processing(int id
) {}
136 * Data for tracking currently executing items for indication through /proc
138 #ifdef CONFIG_SLOW_WORK_DEBUG
139 struct slow_work
*slow_work_execs
[SLOW_WORK_THREAD_LIMIT
];
140 pid_t slow_work_pids
[SLOW_WORK_THREAD_LIMIT
];
141 DEFINE_RWLOCK(slow_work_execs_lock
);
145 * The queues of work items and the lock governing access to them. These are
146 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
147 * as the number of threads bears no relation to the number of CPUs.
149 * There are two queues of work items: one for slow work items, and one for
150 * very slow work items.
152 LIST_HEAD(slow_work_queue
);
153 LIST_HEAD(vslow_work_queue
);
154 DEFINE_SPINLOCK(slow_work_queue_lock
);
157 * The following are two wait queues that get pinged when a work item is placed
158 * on an empty queue. These allow work items that are hogging a thread by
159 * sleeping in a way that could be deferred to yield their thread and enqueue
162 static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation
);
163 static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation
);
166 * The thread controls. A variable used to signal to the threads that they
167 * should exit when the queue is empty, a waitqueue used by the threads to wait
168 * for signals, and a completion set by the last thread to exit.
170 static bool slow_work_threads_should_exit
;
171 static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq
);
172 static DECLARE_COMPLETION(slow_work_last_thread_exited
);
175 * The number of users of the thread pool and its lock. Whilst this is zero we
176 * have no threads hanging around, and when this reaches zero, we wait for all
177 * active or queued work items to complete and kill all the threads we do have.
179 static int slow_work_user_count
;
180 static DEFINE_MUTEX(slow_work_user_lock
);
182 static inline int slow_work_get_ref(struct slow_work
*work
)
184 if (work
->ops
->get_ref
)
185 return work
->ops
->get_ref(work
);
190 static inline void slow_work_put_ref(struct slow_work
*work
)
192 if (work
->ops
->put_ref
)
193 work
->ops
->put_ref(work
);
197 * Calculate the maximum number of active threads in the pool that are
198 * permitted to process very slow work items.
200 * The answer is rounded up to at least 1, but may not equal or exceed the
201 * maximum number of the threads in the pool. This means we always have at
202 * least one thread that can process slow work items, and we always have at
203 * least one thread that won't get tied up doing so.
205 static unsigned slow_work_calc_vsmax(void)
209 vsmax
= atomic_read(&slow_work_thread_count
) * vslow_work_proportion
;
211 vsmax
= max(vsmax
, 1U);
212 return min(vsmax
, slow_work_max_threads
- 1);
216 * Attempt to execute stuff queued on a slow thread. Return true if we managed
217 * it, false if there was nothing to do.
219 static noinline
bool slow_work_execute(int id
)
221 struct slow_work
*work
= NULL
;
225 vsmax
= slow_work_calc_vsmax();
227 /* see if we can schedule a new thread to be started if we're not
228 * keeping up with the work */
229 if (!waitqueue_active(&slow_work_thread_wq
) &&
230 (!list_empty(&slow_work_queue
) || !list_empty(&vslow_work_queue
)) &&
231 atomic_read(&slow_work_thread_count
) < slow_work_max_threads
&&
232 !slow_work_may_not_start_new_thread
)
233 slow_work_enqueue(&slow_work_new_thread
);
235 /* find something to execute */
236 spin_lock_irq(&slow_work_queue_lock
);
237 if (!list_empty(&vslow_work_queue
) &&
238 atomic_read(&vslow_work_executing_count
) < vsmax
) {
239 work
= list_entry(vslow_work_queue
.next
,
240 struct slow_work
, link
);
241 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING
, &work
->flags
))
243 list_del_init(&work
->link
);
244 atomic_inc(&vslow_work_executing_count
);
246 } else if (!list_empty(&slow_work_queue
)) {
247 work
= list_entry(slow_work_queue
.next
,
248 struct slow_work
, link
);
249 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING
, &work
->flags
))
251 list_del_init(&work
->link
);
254 very_slow
= false; /* avoid the compiler warning */
257 slow_work_set_thread_processing(id
, work
);
259 slow_work_mark_time(work
);
260 slow_work_begin_exec(id
, work
);
263 spin_unlock_irq(&slow_work_queue_lock
);
268 if (!test_and_clear_bit(SLOW_WORK_PENDING
, &work
->flags
))
271 /* don't execute if the work is in the process of being cancelled */
272 if (!test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
273 work
->ops
->execute(work
);
276 atomic_dec(&vslow_work_executing_count
);
277 clear_bit_unlock(SLOW_WORK_EXECUTING
, &work
->flags
);
279 /* wake up anyone waiting for this work to be complete */
280 wake_up_bit(&work
->flags
, SLOW_WORK_EXECUTING
);
282 slow_work_end_exec(id
, work
);
284 /* if someone tried to enqueue the item whilst we were executing it,
285 * then it'll be left unenqueued to avoid multiple threads trying to
286 * execute it simultaneously
288 * there is, however, a race between us testing the pending flag and
289 * getting the spinlock, and between the enqueuer setting the pending
290 * flag and getting the spinlock, so we use a deferral bit to tell us
291 * if the enqueuer got there first
293 if (test_bit(SLOW_WORK_PENDING
, &work
->flags
)) {
294 spin_lock_irq(&slow_work_queue_lock
);
296 if (!test_bit(SLOW_WORK_EXECUTING
, &work
->flags
) &&
297 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
))
300 spin_unlock_irq(&slow_work_queue_lock
);
303 /* sort out the race between module unloading and put_ref() */
304 slow_work_put_ref(work
);
305 slow_work_done_thread_processing(id
, work
);
310 /* we must complete the enqueue operation
311 * - we transfer our ref on the item back to the appropriate queue
312 * - don't wake another thread up as we're awake already
314 slow_work_mark_time(work
);
315 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
))
316 list_add_tail(&work
->link
, &vslow_work_queue
);
318 list_add_tail(&work
->link
, &slow_work_queue
);
319 spin_unlock_irq(&slow_work_queue_lock
);
320 slow_work_clear_thread_processing(id
);
325 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
326 * work: The work item under execution that wants to sleep
327 * _timeout: Scheduler sleep timeout
329 * Allow a requeueable work item to sleep on a slow-work processor thread until
330 * that thread is needed to do some other work or the sleep is interrupted by
333 * The caller must set up a wake up event before calling this and must have set
334 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
335 * condition before calling this function as no test is made here.
337 * False is returned if there is nothing on the queue; true is returned if the
338 * work item should be requeued
340 bool slow_work_sleep_till_thread_needed(struct slow_work
*work
,
341 signed long *_timeout
)
343 wait_queue_head_t
*wfo_wq
;
344 struct list_head
*queue
;
348 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
)) {
349 wfo_wq
= &vslow_work_queue_waits_for_occupation
;
350 queue
= &vslow_work_queue
;
352 wfo_wq
= &slow_work_queue_waits_for_occupation
;
353 queue
= &slow_work_queue
;
356 if (!list_empty(queue
))
359 add_wait_queue_exclusive(wfo_wq
, &wait
);
360 if (list_empty(queue
))
361 *_timeout
= schedule_timeout(*_timeout
);
362 finish_wait(wfo_wq
, &wait
);
364 return !list_empty(queue
);
366 EXPORT_SYMBOL(slow_work_sleep_till_thread_needed
);
369 * slow_work_enqueue - Schedule a slow work item for processing
370 * @work: The work item to queue
372 * Schedule a slow work item for processing. If the item is already undergoing
373 * execution, this guarantees not to re-enter the execution routine until the
374 * first execution finishes.
376 * The item is pinned by this function as it retains a reference to it, managed
377 * through the item operations. The item is unpinned once it has been
380 * An item may hog the thread that is running it for a relatively large amount
381 * of time, sufficient, for example, to perform several lookup, mkdir, create
382 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
384 * Conversely, if a number of items are awaiting processing, it may take some
385 * time before any given item is given attention. The number of threads in the
386 * pool may be increased to deal with demand, but only up to a limit.
388 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
389 * the very slow queue, from which only a portion of the threads will be
390 * allowed to pick items to execute. This ensures that very slow items won't
391 * overly block ones that are just ordinarily slow.
393 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
396 int slow_work_enqueue(struct slow_work
*work
)
398 wait_queue_head_t
*wfo_wq
;
399 struct list_head
*queue
;
403 if (test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
406 BUG_ON(slow_work_user_count
<= 0);
410 /* when honouring an enqueue request, we only promise that we will run
411 * the work function in the future; we do not promise to run it once
412 * per enqueue request
414 * we use the PENDING bit to merge together repeat requests without
415 * having to disable IRQs and take the spinlock, whilst still
416 * maintaining our promise
418 if (!test_and_set_bit_lock(SLOW_WORK_PENDING
, &work
->flags
)) {
419 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
)) {
420 wfo_wq
= &vslow_work_queue_waits_for_occupation
;
421 queue
= &vslow_work_queue
;
423 wfo_wq
= &slow_work_queue_waits_for_occupation
;
424 queue
= &slow_work_queue
;
427 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
429 if (unlikely(test_bit(SLOW_WORK_CANCELLING
, &work
->flags
)))
432 /* we promise that we will not attempt to execute the work
433 * function in more than one thread simultaneously
435 * this, however, leaves us with a problem if we're asked to
436 * enqueue the work whilst someone is executing the work
437 * function as simply queueing the work immediately means that
438 * another thread may try executing it whilst it is already
441 * to deal with this, we set the ENQ_DEFERRED bit instead of
442 * enqueueing, and the thread currently executing the work
443 * function will enqueue the work item when the work function
444 * returns and it has cleared the EXECUTING bit
446 if (test_bit(SLOW_WORK_EXECUTING
, &work
->flags
)) {
447 set_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
);
449 ret
= slow_work_get_ref(work
);
452 slow_work_mark_time(work
);
453 list_add_tail(&work
->link
, queue
);
454 wake_up(&slow_work_thread_wq
);
456 /* if someone who could be requeued is sleeping on a
457 * thread, then ask them to yield their thread */
458 if (work
->link
.prev
== queue
)
462 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
469 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
472 EXPORT_SYMBOL(slow_work_enqueue
);
474 static int slow_work_wait(void *word
)
481 * slow_work_cancel - Cancel a slow work item
482 * @work: The work item to cancel
484 * This function will cancel a previously enqueued work item. If we cannot
485 * cancel the work item, it is guarenteed to have run when this function
488 void slow_work_cancel(struct slow_work
*work
)
490 bool wait
= true, put
= false;
492 set_bit(SLOW_WORK_CANCELLING
, &work
->flags
);
495 /* if the work item is a delayed work item with an active timer, we
496 * need to wait for the timer to finish _before_ getting the spinlock,
497 * lest we deadlock against the timer routine
499 * the timer routine will leave DELAYED set if it notices the
500 * CANCELLING flag in time
502 if (test_bit(SLOW_WORK_DELAYED
, &work
->flags
)) {
503 struct delayed_slow_work
*dwork
=
504 container_of(work
, struct delayed_slow_work
, work
);
505 del_timer_sync(&dwork
->timer
);
508 spin_lock_irq(&slow_work_queue_lock
);
510 if (test_bit(SLOW_WORK_DELAYED
, &work
->flags
)) {
511 /* the timer routine aborted or never happened, so we are left
512 * holding the timer's reference on the item and should just
513 * drop the pending flag and wait for any ongoing execution to
515 struct delayed_slow_work
*dwork
=
516 container_of(work
, struct delayed_slow_work
, work
);
518 BUG_ON(timer_pending(&dwork
->timer
));
519 BUG_ON(!list_empty(&work
->link
));
521 clear_bit(SLOW_WORK_DELAYED
, &work
->flags
);
523 clear_bit(SLOW_WORK_PENDING
, &work
->flags
);
525 } else if (test_bit(SLOW_WORK_PENDING
, &work
->flags
) &&
526 !list_empty(&work
->link
)) {
527 /* the link in the pending queue holds a reference on the item
528 * that we will need to release */
529 list_del_init(&work
->link
);
532 clear_bit(SLOW_WORK_PENDING
, &work
->flags
);
534 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
)) {
535 /* the executor is holding our only reference on the item, so
536 * we merely need to wait for it to finish executing */
537 clear_bit(SLOW_WORK_PENDING
, &work
->flags
);
540 spin_unlock_irq(&slow_work_queue_lock
);
542 /* the EXECUTING flag is set by the executor whilst the spinlock is set
543 * and before the item is dequeued - so assuming the above doesn't
544 * actually dequeue it, simply waiting for the EXECUTING flag to be
545 * released here should be sufficient */
547 wait_on_bit(&work
->flags
, SLOW_WORK_EXECUTING
, slow_work_wait
,
548 TASK_UNINTERRUPTIBLE
);
550 clear_bit(SLOW_WORK_CANCELLING
, &work
->flags
);
552 slow_work_put_ref(work
);
554 EXPORT_SYMBOL(slow_work_cancel
);
557 * Handle expiry of the delay timer, indicating that a delayed slow work item
558 * should now be queued if not cancelled
560 static void delayed_slow_work_timer(unsigned long data
)
562 wait_queue_head_t
*wfo_wq
;
563 struct list_head
*queue
;
564 struct slow_work
*work
= (struct slow_work
*) data
;
566 bool queued
= false, put
= false, first
= false;
568 if (test_bit(SLOW_WORK_VERY_SLOW
, &work
->flags
)) {
569 wfo_wq
= &vslow_work_queue_waits_for_occupation
;
570 queue
= &vslow_work_queue
;
572 wfo_wq
= &slow_work_queue_waits_for_occupation
;
573 queue
= &slow_work_queue
;
576 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
577 if (likely(!test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))) {
578 clear_bit(SLOW_WORK_DELAYED
, &work
->flags
);
580 if (test_bit(SLOW_WORK_EXECUTING
, &work
->flags
)) {
581 /* we discard the reference the timer was holding in
582 * favour of the one the executor holds */
583 set_bit(SLOW_WORK_ENQ_DEFERRED
, &work
->flags
);
586 slow_work_mark_time(work
);
587 list_add_tail(&work
->link
, queue
);
589 if (work
->link
.prev
== queue
)
594 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
596 slow_work_put_ref(work
);
600 wake_up(&slow_work_thread_wq
);
604 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
605 * @dwork: The delayed work item to queue
606 * @delay: When to start executing the work, in jiffies from now
608 * This is similar to slow_work_enqueue(), but it adds a delay before the work
609 * is actually queued for processing.
611 * The item can have delayed processing requested on it whilst it is being
612 * executed. The delay will begin immediately, and if it expires before the
613 * item finishes executing, the item will be placed back on the queue when it
614 * has done executing.
616 int delayed_slow_work_enqueue(struct delayed_slow_work
*dwork
,
619 struct slow_work
*work
= &dwork
->work
;
624 return slow_work_enqueue(&dwork
->work
);
626 BUG_ON(slow_work_user_count
<= 0);
630 if (test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
633 if (!test_and_set_bit_lock(SLOW_WORK_PENDING
, &work
->flags
)) {
634 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
636 if (test_bit(SLOW_WORK_CANCELLING
, &work
->flags
))
639 /* the timer holds a reference whilst it is pending */
640 ret
= work
->ops
->get_ref(work
);
644 if (test_and_set_bit(SLOW_WORK_DELAYED
, &work
->flags
))
646 dwork
->timer
.expires
= jiffies
+ delay
;
647 dwork
->timer
.data
= (unsigned long) work
;
648 dwork
->timer
.function
= delayed_slow_work_timer
;
649 add_timer(&dwork
->timer
);
651 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
659 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
662 EXPORT_SYMBOL(delayed_slow_work_enqueue
);
665 * Schedule a cull of the thread pool at some time in the near future
667 static void slow_work_schedule_cull(void)
669 mod_timer(&slow_work_cull_timer
,
670 round_jiffies(jiffies
+ SLOW_WORK_CULL_TIMEOUT
));
674 * Worker thread culling algorithm
676 static bool slow_work_cull_thread(void)
679 bool do_cull
= false;
681 spin_lock_irqsave(&slow_work_queue_lock
, flags
);
683 if (slow_work_cull
) {
684 slow_work_cull
= false;
686 if (list_empty(&slow_work_queue
) &&
687 list_empty(&vslow_work_queue
) &&
688 atomic_read(&slow_work_thread_count
) >
689 slow_work_min_threads
) {
690 slow_work_schedule_cull();
695 spin_unlock_irqrestore(&slow_work_queue_lock
, flags
);
700 * Determine if there is slow work available for dispatch
702 static inline bool slow_work_available(int vsmax
)
704 return !list_empty(&slow_work_queue
) ||
705 (!list_empty(&vslow_work_queue
) &&
706 atomic_read(&vslow_work_executing_count
) < vsmax
);
710 * Worker thread dispatcher
712 static int slow_work_thread(void *_data
)
719 set_user_nice(current
, -5);
721 /* allocate ourselves an ID */
722 spin_lock_irq(&slow_work_queue_lock
);
723 id
= find_first_zero_bit(slow_work_ids
, SLOW_WORK_THREAD_LIMIT
);
724 BUG_ON(id
< 0 || id
>= SLOW_WORK_THREAD_LIMIT
);
725 __set_bit(id
, slow_work_ids
);
726 slow_work_set_thread_pid(id
, current
->pid
);
727 spin_unlock_irq(&slow_work_queue_lock
);
729 sprintf(current
->comm
, "kslowd%03u", id
);
732 vsmax
= vslow_work_proportion
;
733 vsmax
*= atomic_read(&slow_work_thread_count
);
736 prepare_to_wait_exclusive(&slow_work_thread_wq
, &wait
,
738 if (!freezing(current
) &&
739 !slow_work_threads_should_exit
&&
740 !slow_work_available(vsmax
) &&
743 finish_wait(&slow_work_thread_wq
, &wait
);
747 vsmax
= vslow_work_proportion
;
748 vsmax
*= atomic_read(&slow_work_thread_count
);
751 if (slow_work_available(vsmax
) && slow_work_execute(id
)) {
753 if (list_empty(&slow_work_queue
) &&
754 list_empty(&vslow_work_queue
) &&
755 atomic_read(&slow_work_thread_count
) >
756 slow_work_min_threads
)
757 slow_work_schedule_cull();
761 if (slow_work_threads_should_exit
)
764 if (slow_work_cull
&& slow_work_cull_thread())
768 spin_lock_irq(&slow_work_queue_lock
);
769 slow_work_set_thread_pid(id
, 0);
770 __clear_bit(id
, slow_work_ids
);
771 spin_unlock_irq(&slow_work_queue_lock
);
773 if (atomic_dec_and_test(&slow_work_thread_count
))
774 complete_and_exit(&slow_work_last_thread_exited
, 0);
779 * Handle thread cull timer expiration
781 static void slow_work_cull_timeout(unsigned long data
)
783 slow_work_cull
= true;
784 wake_up(&slow_work_thread_wq
);
788 * Start a new slow work thread
790 static void slow_work_new_thread_execute(struct slow_work
*work
)
792 struct task_struct
*p
;
794 if (slow_work_threads_should_exit
)
797 if (atomic_read(&slow_work_thread_count
) >= slow_work_max_threads
)
800 if (!mutex_trylock(&slow_work_user_lock
))
803 slow_work_may_not_start_new_thread
= true;
804 atomic_inc(&slow_work_thread_count
);
805 p
= kthread_run(slow_work_thread
, NULL
, "kslowd");
807 printk(KERN_DEBUG
"Slow work thread pool: OOM\n");
808 if (atomic_dec_and_test(&slow_work_thread_count
))
809 BUG(); /* we're running on a slow work thread... */
810 mod_timer(&slow_work_oom_timer
,
811 round_jiffies(jiffies
+ SLOW_WORK_OOM_TIMEOUT
));
813 /* ratelimit the starting of new threads */
814 mod_timer(&slow_work_oom_timer
, jiffies
+ 1);
817 mutex_unlock(&slow_work_user_lock
);
820 static const struct slow_work_ops slow_work_new_thread_ops
= {
821 .owner
= THIS_MODULE
,
822 .execute
= slow_work_new_thread_execute
,
823 #ifdef CONFIG_SLOW_WORK_DEBUG
824 .desc
= slow_work_new_thread_desc
,
829 * post-OOM new thread start suppression expiration
831 static void slow_work_oom_timeout(unsigned long data
)
833 slow_work_may_not_start_new_thread
= false;
838 * Handle adjustment of the minimum number of threads
840 static int slow_work_min_threads_sysctl(struct ctl_table
*table
, int write
,
842 size_t *lenp
, loff_t
*ppos
)
844 int ret
= proc_dointvec_minmax(table
, write
, buffer
, lenp
, ppos
);
848 mutex_lock(&slow_work_user_lock
);
849 if (slow_work_user_count
> 0) {
850 /* see if we need to start or stop threads */
851 n
= atomic_read(&slow_work_thread_count
) -
852 slow_work_min_threads
;
854 if (n
< 0 && !slow_work_may_not_start_new_thread
)
855 slow_work_enqueue(&slow_work_new_thread
);
857 slow_work_schedule_cull();
859 mutex_unlock(&slow_work_user_lock
);
866 * Handle adjustment of the maximum number of threads
868 static int slow_work_max_threads_sysctl(struct ctl_table
*table
, int write
,
870 size_t *lenp
, loff_t
*ppos
)
872 int ret
= proc_dointvec_minmax(table
, write
, buffer
, lenp
, ppos
);
876 mutex_lock(&slow_work_user_lock
);
877 if (slow_work_user_count
> 0) {
878 /* see if we need to stop threads */
879 n
= slow_work_max_threads
-
880 atomic_read(&slow_work_thread_count
);
883 slow_work_schedule_cull();
885 mutex_unlock(&slow_work_user_lock
);
890 #endif /* CONFIG_SYSCTL */
893 * slow_work_register_user - Register a user of the facility
894 * @module: The module about to make use of the facility
896 * Register a user of the facility, starting up the initial threads if there
897 * aren't any other users at this point. This will return 0 if successful, or
900 int slow_work_register_user(struct module
*module
)
902 struct task_struct
*p
;
905 mutex_lock(&slow_work_user_lock
);
907 if (slow_work_user_count
== 0) {
908 printk(KERN_NOTICE
"Slow work thread pool: Starting up\n");
909 init_completion(&slow_work_last_thread_exited
);
911 slow_work_threads_should_exit
= false;
912 slow_work_init(&slow_work_new_thread
,
913 &slow_work_new_thread_ops
);
914 slow_work_may_not_start_new_thread
= false;
915 slow_work_cull
= false;
917 /* start the minimum number of threads */
918 for (loop
= 0; loop
< slow_work_min_threads
; loop
++) {
919 atomic_inc(&slow_work_thread_count
);
920 p
= kthread_run(slow_work_thread
, NULL
, "kslowd");
924 printk(KERN_NOTICE
"Slow work thread pool: Ready\n");
927 slow_work_user_count
++;
928 mutex_unlock(&slow_work_user_lock
);
932 if (atomic_dec_and_test(&slow_work_thread_count
))
933 complete(&slow_work_last_thread_exited
);
935 printk(KERN_ERR
"Slow work thread pool:"
936 " Aborting startup on ENOMEM\n");
937 slow_work_threads_should_exit
= true;
938 wake_up_all(&slow_work_thread_wq
);
939 wait_for_completion(&slow_work_last_thread_exited
);
940 printk(KERN_ERR
"Slow work thread pool: Aborted\n");
942 mutex_unlock(&slow_work_user_lock
);
945 EXPORT_SYMBOL(slow_work_register_user
);
948 * wait for all outstanding items from the calling module to complete
949 * - note that more items may be queued whilst we're waiting
951 static void slow_work_wait_for_items(struct module
*module
)
953 #ifdef CONFIG_MODULES
954 DECLARE_WAITQUEUE(myself
, current
);
955 struct slow_work
*work
;
958 mutex_lock(&slow_work_unreg_sync_lock
);
959 add_wait_queue(&slow_work_unreg_wq
, &myself
);
962 spin_lock_irq(&slow_work_queue_lock
);
964 /* first of all, we wait for the last queued item in each list
966 list_for_each_entry_reverse(work
, &vslow_work_queue
, link
) {
967 if (work
->owner
== module
) {
968 set_current_state(TASK_UNINTERRUPTIBLE
);
969 slow_work_unreg_work_item
= work
;
973 list_for_each_entry_reverse(work
, &slow_work_queue
, link
) {
974 if (work
->owner
== module
) {
975 set_current_state(TASK_UNINTERRUPTIBLE
);
976 slow_work_unreg_work_item
= work
;
981 /* then we wait for the items being processed to finish */
982 slow_work_unreg_module
= module
;
984 for (loop
= 0; loop
< SLOW_WORK_THREAD_LIMIT
; loop
++) {
985 if (slow_work_thread_processing
[loop
] == module
)
988 spin_unlock_irq(&slow_work_queue_lock
);
989 break; /* okay, we're done */
992 spin_unlock_irq(&slow_work_queue_lock
);
994 slow_work_unreg_work_item
= NULL
;
995 slow_work_unreg_module
= NULL
;
998 remove_wait_queue(&slow_work_unreg_wq
, &myself
);
999 mutex_unlock(&slow_work_unreg_sync_lock
);
1000 #endif /* CONFIG_MODULES */
1004 * slow_work_unregister_user - Unregister a user of the facility
1005 * @module: The module whose items should be cleared
1007 * Unregister a user of the facility, killing all the threads if this was the
1010 * This waits for all the work items belonging to the nominated module to go
1011 * away before proceeding.
1013 void slow_work_unregister_user(struct module
*module
)
1015 /* first of all, wait for all outstanding items from the calling module
1018 slow_work_wait_for_items(module
);
1020 /* then we can actually go about shutting down the facility if need
1022 mutex_lock(&slow_work_user_lock
);
1024 BUG_ON(slow_work_user_count
<= 0);
1026 slow_work_user_count
--;
1027 if (slow_work_user_count
== 0) {
1028 printk(KERN_NOTICE
"Slow work thread pool: Shutting down\n");
1029 slow_work_threads_should_exit
= true;
1030 del_timer_sync(&slow_work_cull_timer
);
1031 del_timer_sync(&slow_work_oom_timer
);
1032 wake_up_all(&slow_work_thread_wq
);
1033 wait_for_completion(&slow_work_last_thread_exited
);
1034 printk(KERN_NOTICE
"Slow work thread pool:"
1035 " Shut down complete\n");
1038 mutex_unlock(&slow_work_user_lock
);
1040 EXPORT_SYMBOL(slow_work_unregister_user
);
1043 * Initialise the slow work facility
1045 static int __init
init_slow_work(void)
1047 unsigned nr_cpus
= num_possible_cpus();
1049 if (slow_work_max_threads
< nr_cpus
)
1050 slow_work_max_threads
= nr_cpus
;
1051 #ifdef CONFIG_SYSCTL
1052 if (slow_work_max_max_threads
< nr_cpus
* 2)
1053 slow_work_max_max_threads
= nr_cpus
* 2;
1055 #ifdef CONFIG_SLOW_WORK_DEBUG
1057 struct dentry
*dbdir
;
1059 dbdir
= debugfs_create_dir("slow_work", NULL
);
1060 if (dbdir
&& !IS_ERR(dbdir
))
1061 debugfs_create_file("runqueue", S_IFREG
| 0400, dbdir
,
1062 NULL
, &slow_work_runqueue_fops
);
1068 subsys_initcall(init_slow_work
);