2 * Copyright (c) 2000 Doug Rabson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/spinlock.h>
39 #include <sys/spinlock2.h>
40 #include <sys/serialize.h>
43 MALLOC_DEFINE(M_TASKQUEUE
, "taskqueue", "Task Queues");
45 static STAILQ_HEAD(taskqueue_list
, taskqueue
) taskqueue_queues
;
46 static struct lock taskqueue_queues_lock
;
47 static struct spinlock taskqueue_queues_spin
;
50 STAILQ_ENTRY(taskqueue
) tq_link
;
51 STAILQ_HEAD(, task
) tq_queue
;
53 /* NOTE: tq must be locked before calling tq_enqueue */
54 taskqueue_enqueue_fn tq_enqueue
;
57 struct task
*tq_running
;
58 struct spinlock tq_lock
;
59 struct thread
**tq_threads
;
65 #define TQ_FLAGS_ACTIVE (1 << 0)
66 #define TQ_FLAGS_BLOCKED (1 << 1)
67 #define TQ_FLAGS_PENDING (1 << 2)
69 #define DT_CALLOUT_ARMED (1 << 0)
72 _timeout_task_init(struct taskqueue
*queue
, struct timeout_task
*timeout_task
,
73 int priority
, task_fn_t func
, void *context
)
76 TASK_INIT(&timeout_task
->t
, priority
, func
, context
);
77 callout_init_mp(&timeout_task
->c
);
78 timeout_task
->t
.ta_queue
= queue
;
82 static void taskqueue_run(struct taskqueue
*queue
, int lock_held
);
85 TQ_LOCK_INIT(struct taskqueue
*tq
)
87 spin_init(&tq
->tq_lock
, "tqlock");
91 TQ_LOCK_UNINIT(struct taskqueue
*tq
)
93 spin_uninit(&tq
->tq_lock
);
97 TQ_LOCK(struct taskqueue
*tq
)
99 spin_lock(&tq
->tq_lock
);
103 TQ_UNLOCK(struct taskqueue
*tq
)
105 spin_unlock(&tq
->tq_lock
);
109 TQ_SLEEP(struct taskqueue
*tq
, void *ident
, const char *wmesg
)
111 ssleep(ident
, &tq
->tq_lock
, 0, wmesg
, 0);
115 taskqueue_create(const char *name
, int mflags
,
116 taskqueue_enqueue_fn enqueue
, void *context
)
118 struct taskqueue
*queue
;
120 queue
= kmalloc(sizeof(*queue
), M_TASKQUEUE
, mflags
| M_ZERO
);
123 STAILQ_INIT(&queue
->tq_queue
);
124 queue
->tq_name
= name
;
125 queue
->tq_enqueue
= enqueue
;
126 queue
->tq_context
= context
;
127 queue
->tq_flags
|= TQ_FLAGS_ACTIVE
;
130 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
131 STAILQ_INSERT_TAIL(&taskqueue_queues
, queue
, tq_link
);
132 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
137 /* NOTE: tq must be locked */
139 taskqueue_terminate(struct thread
**pp
, struct taskqueue
*tq
)
141 while (tq
->tq_tcount
> 0) {
142 /* Unlock spinlock before wakeup() */
146 TQ_SLEEP(tq
, pp
, "taskqueue_terminate");
151 taskqueue_free(struct taskqueue
*queue
)
154 queue
->tq_flags
&= ~TQ_FLAGS_ACTIVE
;
155 taskqueue_run(queue
, 1);
156 taskqueue_terminate(queue
->tq_threads
, queue
);
159 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
160 STAILQ_REMOVE(&taskqueue_queues
, queue
, taskqueue
, tq_link
);
161 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
163 TQ_LOCK_UNINIT(queue
);
165 kfree(queue
, M_TASKQUEUE
);
169 taskqueue_find(const char *name
)
171 struct taskqueue
*queue
;
173 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
174 STAILQ_FOREACH(queue
, &taskqueue_queues
, tq_link
) {
175 if (!strcmp(queue
->tq_name
, name
)) {
176 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
180 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
185 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
186 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED.
187 * So either use a throwaway task which will only be enqueued once, or
188 * use one task per CPU!
191 taskqueue_enqueue_locked(struct taskqueue
*queue
, struct task
*task
)
197 * Don't allow new tasks on a queue which is being freed.
199 if ((queue
->tq_flags
& TQ_FLAGS_ACTIVE
) == 0)
203 * Count multiple enqueues.
205 if (task
->ta_pending
) {
206 KKASSERT(queue
== task
->ta_queue
);
210 task
->ta_queue
= queue
;
213 * Optimise the case when all tasks have the same priority.
215 prev
= STAILQ_LAST(&queue
->tq_queue
, task
, ta_link
);
216 if (!prev
|| prev
->ta_priority
>= task
->ta_priority
) {
217 STAILQ_INSERT_TAIL(&queue
->tq_queue
, task
, ta_link
);
220 for (ins
= STAILQ_FIRST(&queue
->tq_queue
); ins
;
221 prev
= ins
, ins
= STAILQ_NEXT(ins
, ta_link
))
222 if (ins
->ta_priority
< task
->ta_priority
)
226 STAILQ_INSERT_AFTER(&queue
->tq_queue
, prev
, task
, ta_link
);
228 STAILQ_INSERT_HEAD(&queue
->tq_queue
, task
, ta_link
);
231 task
->ta_pending
= 1;
232 if ((queue
->tq_flags
& TQ_FLAGS_BLOCKED
) == 0) {
233 if (queue
->tq_enqueue
)
234 queue
->tq_enqueue(queue
->tq_context
);
236 queue
->tq_flags
|= TQ_FLAGS_PENDING
;
243 * This version requires that the task not be moved between queues
244 * in an uncontrolled fashion.
247 taskqueue_enqueue(struct taskqueue
*queue
, struct task
*task
)
252 res
= taskqueue_enqueue_locked(queue
, task
);
259 * This version allows a task to be moved between queues in an uncontrolled
260 * fashion. (*qpp) is set to the queue the task is (possibly already)
261 * enqueued on, or the specified queue if it is possible to move the task.
264 taskqueue_enqueue_optq(struct taskqueue
*queue
, struct taskqueue
**qpp
,
267 struct taskqueue
*qtmp
;
271 * Interlock for task structure check, handle the case where we
272 * are unable to safely shift the task to the specified queue.
275 qtmp
= task
->ta_queue
;
279 spin_lock(&taskqueue_queues_spin
);
280 if (task
->ta_queue
== NULL
)
281 task
->ta_queue
= queue
;
282 spin_unlock(&taskqueue_queues_spin
);
285 if (task
->ta_queue
== qtmp
) {
290 * If qtmp is pending on a different queue
291 * it must stay on that queue.
293 * WARNING: Once ta_queue is reassigned
294 * our qtmp lock is no longer
295 * sufficient and we lose control
298 if (task
->ta_pending
) {
305 task
->ta_queue
= queue
;
314 * The task is assigned to (queue), enqueue it there.
317 res
= taskqueue_enqueue_locked(queue
, task
);
324 taskqueue_timeout_func(void *arg
)
326 struct taskqueue
*queue
;
327 struct timeout_task
*timeout_task
;
330 queue
= timeout_task
->t
.ta_queue
;
333 KASSERT((timeout_task
->f
& DT_CALLOUT_ARMED
) != 0, ("Stray timeout"));
334 timeout_task
->f
&= ~DT_CALLOUT_ARMED
;
335 queue
->tq_callouts
--;
336 taskqueue_enqueue_locked(queue
, &timeout_task
->t
);
341 taskqueue_enqueue_timeout(struct taskqueue
*queue
,
342 struct timeout_task
*timeout_task
, int ticks
)
347 KASSERT(timeout_task
->t
.ta_queue
== NULL
||
348 timeout_task
->t
.ta_queue
== queue
,
350 timeout_task
->t
.ta_queue
= queue
;
351 res
= timeout_task
->t
.ta_pending
;
353 taskqueue_enqueue_locked(queue
, &timeout_task
->t
);
356 if ((timeout_task
->f
& DT_CALLOUT_ARMED
) != 0) {
359 queue
->tq_callouts
++;
360 timeout_task
->f
|= DT_CALLOUT_ARMED
;
363 callout_reset(&timeout_task
->c
, ticks
, taskqueue_timeout_func
,
370 taskqueue_block(struct taskqueue
*queue
)
373 queue
->tq_flags
|= TQ_FLAGS_BLOCKED
;
378 taskqueue_unblock(struct taskqueue
*queue
)
381 queue
->tq_flags
&= ~TQ_FLAGS_BLOCKED
;
382 if (queue
->tq_flags
& TQ_FLAGS_PENDING
) {
383 queue
->tq_flags
&= ~TQ_FLAGS_PENDING
;
384 if (queue
->tq_enqueue
)
385 queue
->tq_enqueue(queue
->tq_context
);
391 taskqueue_run(struct taskqueue
*queue
, int lock_held
)
398 while (STAILQ_FIRST(&queue
->tq_queue
)) {
400 * Carefully remove the first task from the queue and
401 * zero its pending count.
403 task
= STAILQ_FIRST(&queue
->tq_queue
);
404 STAILQ_REMOVE_HEAD(&queue
->tq_queue
, ta_link
);
405 pending
= task
->ta_pending
;
406 task
->ta_pending
= 0;
407 queue
->tq_running
= task
;
410 task
->ta_func(task
->ta_context
, pending
);
411 queue
->tq_running
= NULL
;
420 taskqueue_cancel_locked(struct taskqueue
*queue
, struct task
*task
,
424 if (task
->ta_pending
> 0)
425 STAILQ_REMOVE(&queue
->tq_queue
, task
, task
, ta_link
);
427 *pendp
= task
->ta_pending
;
428 task
->ta_pending
= 0;
429 return (task
== queue
->tq_running
? EBUSY
: 0);
433 taskqueue_cancel(struct taskqueue
*queue
, struct task
*task
, u_int
*pendp
)
438 error
= taskqueue_cancel_locked(queue
, task
, pendp
);
445 taskqueue_cancel_simple(struct task
*task
)
447 struct taskqueue
*queue
;
451 queue
= task
->ta_queue
;
458 if (queue
== task
->ta_queue
) {
459 error
= taskqueue_cancel_locked(queue
, task
, NULL
);
469 taskqueue_cancel_timeout(struct taskqueue
*queue
,
470 struct timeout_task
*timeout_task
, u_int
*pendp
)
472 u_int pending
, pending1
;
475 pending
= !!callout_stop(&timeout_task
->c
);
477 error
= taskqueue_cancel_locked(queue
, &timeout_task
->t
, &pending1
);
478 if ((timeout_task
->f
& DT_CALLOUT_ARMED
) != 0) {
479 timeout_task
->f
&= ~DT_CALLOUT_ARMED
;
480 queue
->tq_callouts
--;
485 *pendp
= pending
+ pending1
;
490 taskqueue_drain(struct taskqueue
*queue
, struct task
*task
)
493 while (task
->ta_pending
!= 0 || task
== queue
->tq_running
)
494 TQ_SLEEP(queue
, task
, "-");
499 * Wait for the task to drain and return
502 taskqueue_drain_simple(struct task
*task
)
504 struct taskqueue
*queue
;
507 queue
= task
->ta_queue
;
512 if (task
->ta_pending
== 0 && task
!= queue
->tq_running
) {
516 TQ_SLEEP(queue
, task
, "-");
522 taskqueue_drain_timeout(struct taskqueue
*queue
,
523 struct timeout_task
*timeout_task
)
525 callout_cancel(&timeout_task
->c
);
526 taskqueue_drain(queue
, &timeout_task
->t
);
530 taskqueue_swi_enqueue(void *context
)
536 taskqueue_swi_run(void *arg
, void *frame
)
538 taskqueue_run(taskqueue_swi
, 0);
542 taskqueue_swi_mp_run(void *arg
, void *frame
)
544 taskqueue_run(taskqueue_swi_mp
, 0);
548 taskqueue_start_threads(struct taskqueue
**tqp
, int count
, int pri
, int ncpu
,
549 const char *fmt
, ...)
553 struct taskqueue
*tq
;
555 char ktname
[MAXCOMLEN
];
560 /* catch call argument mistakes */
561 KKASSERT(pri
> 0 && pri
< TDPRI_MAX
);
562 KKASSERT(tq
->tq_enqueue
== taskqueue_thread_enqueue
);
568 kvsnprintf(ktname
, MAXCOMLEN
, fmt
, ap
);
571 tq
->tq_threads
= kmalloc(sizeof(struct thread
*) * count
, M_TASKQUEUE
,
574 for (i
= 0; i
< count
; i
++) {
576 * If no specific cpu was specified and more than one thread
577 * is to be created, we distribute the threads amongst all
580 if ((ncpu
<= -1) && (count
> 1))
584 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
585 &tq
->tq_threads
[i
], NULL
,
589 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
590 &tq
->tq_threads
[i
], NULL
,
595 kprintf("%s: lwkt_create(%s): error %d", __func__
,
597 tq
->tq_threads
[i
] = NULL
;
599 td
= tq
->tq_threads
[i
];
600 lwkt_setpri_initial(td
, pri
);
610 taskqueue_thread_loop(void *arg
)
612 struct taskqueue
**tqp
, *tq
;
617 while ((tq
->tq_flags
& TQ_FLAGS_ACTIVE
) != 0) {
618 taskqueue_run(tq
, 1);
619 TQ_SLEEP(tq
, tq
, "tqthr");
622 /* rendezvous with thread that asked us to terminate */
625 wakeup_one(tq
->tq_threads
);
629 /* NOTE: tq must be locked */
631 taskqueue_thread_enqueue(void *context
)
633 struct taskqueue
**tqp
, *tq
;
638 /* Unlock spinlock before wakeup_one() */
644 TASKQUEUE_DEFINE(swi
, taskqueue_swi_enqueue
, 0,
645 register_swi(SWI_TQ
, taskqueue_swi_run
, NULL
, "swi_taskq", NULL
, -1));
647 * XXX: possibly use a different SWI_TQ_MP or so.
648 * related: sys/interrupt.h
649 * related: platform/XXX/isa/ipl_funcs.c
651 TASKQUEUE_DEFINE(swi_mp
, taskqueue_swi_enqueue
, 0,
652 register_swi_mp(SWI_TQ
, taskqueue_swi_mp_run
, NULL
, "swi_mp_taskq", NULL
,
655 struct taskqueue
*taskqueue_thread
[MAXCPU
];
662 lockinit(&taskqueue_queues_lock
, "tqqueues", 0, 0);
663 spin_init(&taskqueue_queues_spin
, "tqspin");
664 STAILQ_INIT(&taskqueue_queues
);
666 for (cpu
= 0; cpu
< ncpus
; cpu
++) {
667 taskqueue_thread
[cpu
] = taskqueue_create("thread", M_INTWAIT
,
668 taskqueue_thread_enqueue
, &taskqueue_thread
[cpu
]);
669 taskqueue_start_threads(&taskqueue_thread
[cpu
], 1,
670 TDPRI_KERN_DAEMON
, cpu
, "taskq_cpu %d", cpu
);
674 SYSINIT(taskqueueinit
, SI_SUB_PRE_DRIVERS
, SI_ORDER_FIRST
, taskqueue_init
, NULL
);