2 * Copyright (c) 2000 Doug Rabson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/thread2.h>
39 #include <sys/spinlock.h>
40 #include <sys/spinlock2.h>
41 #include <sys/serialize.h>
44 MALLOC_DEFINE(M_TASKQUEUE
, "taskqueue", "Task Queues");
46 static STAILQ_HEAD(taskqueue_list
, taskqueue
) taskqueue_queues
;
47 static struct lock taskqueue_queues_lock
;
50 STAILQ_ENTRY(taskqueue
) tq_link
;
51 STAILQ_HEAD(, task
) tq_queue
;
53 /* NOTE: tq must be locked before calling tq_enqueue */
54 taskqueue_enqueue_fn tq_enqueue
;
57 struct task
*tq_running
;
58 struct spinlock tq_lock
;
59 struct thread
**tq_threads
;
65 #define TQ_FLAGS_ACTIVE (1 << 0)
66 #define TQ_FLAGS_BLOCKED (1 << 1)
67 #define TQ_FLAGS_PENDING (1 << 2)
69 #define DT_CALLOUT_ARMED (1 << 0)
72 _timeout_task_init(struct taskqueue
*queue
, struct timeout_task
*timeout_task
,
73 int priority
, task_fn_t func
, void *context
)
76 TASK_INIT(&timeout_task
->t
, priority
, func
, context
);
77 callout_init(&timeout_task
->c
); /* XXX use callout_init_mp() */
78 timeout_task
->q
= queue
;
82 static void taskqueue_run(struct taskqueue
*queue
, int lock_held
);
85 TQ_LOCK_INIT(struct taskqueue
*tq
)
87 spin_init(&tq
->tq_lock
, "tqlock");
91 TQ_LOCK_UNINIT(struct taskqueue
*tq
)
93 spin_uninit(&tq
->tq_lock
);
97 TQ_LOCK(struct taskqueue
*tq
)
99 spin_lock(&tq
->tq_lock
);
103 TQ_UNLOCK(struct taskqueue
*tq
)
105 spin_unlock(&tq
->tq_lock
);
109 TQ_SLEEP(struct taskqueue
*tq
, void *ident
, const char *wmesg
)
111 ssleep(ident
, &tq
->tq_lock
, 0, wmesg
, 0);
115 taskqueue_create(const char *name
, int mflags
,
116 taskqueue_enqueue_fn enqueue
, void *context
)
118 struct taskqueue
*queue
;
120 queue
= kmalloc(sizeof(*queue
), M_TASKQUEUE
, mflags
| M_ZERO
);
123 STAILQ_INIT(&queue
->tq_queue
);
124 queue
->tq_name
= name
;
125 queue
->tq_enqueue
= enqueue
;
126 queue
->tq_context
= context
;
127 queue
->tq_flags
|= TQ_FLAGS_ACTIVE
;
130 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
131 STAILQ_INSERT_TAIL(&taskqueue_queues
, queue
, tq_link
);
132 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
137 /* NOTE: tq must be locked */
139 taskqueue_terminate(struct thread
**pp
, struct taskqueue
*tq
)
141 while(tq
->tq_tcount
> 0) {
142 /* Unlock spinlock before wakeup() */
146 TQ_SLEEP(tq
, pp
, "taskqueue_terminate");
151 taskqueue_free(struct taskqueue
*queue
)
154 queue
->tq_flags
&= ~TQ_FLAGS_ACTIVE
;
155 taskqueue_run(queue
, 1);
156 taskqueue_terminate(queue
->tq_threads
, queue
);
159 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
160 STAILQ_REMOVE(&taskqueue_queues
, queue
, taskqueue
, tq_link
);
161 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
163 TQ_LOCK_UNINIT(queue
);
165 kfree(queue
, M_TASKQUEUE
);
169 taskqueue_find(const char *name
)
171 struct taskqueue
*queue
;
173 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
174 STAILQ_FOREACH(queue
, &taskqueue_queues
, tq_link
) {
175 if (!strcmp(queue
->tq_name
, name
)) {
176 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
180 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
185 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
186 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED.
187 * So either use a throwaway task which will only be enqueued once, or
188 * use one task per CPU!
191 taskqueue_enqueue_locked(struct taskqueue
*queue
, struct task
*task
)
197 * Don't allow new tasks on a queue which is being freed.
199 if ((queue
->tq_flags
& TQ_FLAGS_ACTIVE
) == 0)
203 * Count multiple enqueues.
205 if (task
->ta_pending
) {
211 * Optimise the case when all tasks have the same priority.
213 prev
= STAILQ_LAST(&queue
->tq_queue
, task
, ta_link
);
214 if (!prev
|| prev
->ta_priority
>= task
->ta_priority
) {
215 STAILQ_INSERT_TAIL(&queue
->tq_queue
, task
, ta_link
);
218 for (ins
= STAILQ_FIRST(&queue
->tq_queue
); ins
;
219 prev
= ins
, ins
= STAILQ_NEXT(ins
, ta_link
))
220 if (ins
->ta_priority
< task
->ta_priority
)
224 STAILQ_INSERT_AFTER(&queue
->tq_queue
, prev
, task
, ta_link
);
226 STAILQ_INSERT_HEAD(&queue
->tq_queue
, task
, ta_link
);
229 task
->ta_pending
= 1;
230 if ((queue
->tq_flags
& TQ_FLAGS_BLOCKED
) == 0) {
231 if (queue
->tq_enqueue
)
232 queue
->tq_enqueue(queue
->tq_context
);
234 queue
->tq_flags
|= TQ_FLAGS_PENDING
;
241 taskqueue_enqueue(struct taskqueue
*queue
, struct task
*task
)
246 res
= taskqueue_enqueue_locked(queue
, task
);
253 taskqueue_timeout_func(void *arg
)
255 struct taskqueue
*queue
;
256 struct timeout_task
*timeout_task
;
259 queue
= timeout_task
->q
;
262 KASSERT((timeout_task
->f
& DT_CALLOUT_ARMED
) != 0, ("Stray timeout"));
263 timeout_task
->f
&= ~DT_CALLOUT_ARMED
;
264 queue
->tq_callouts
--;
265 taskqueue_enqueue_locked(timeout_task
->q
, &timeout_task
->t
);
270 taskqueue_enqueue_timeout(struct taskqueue
*queue
,
271 struct timeout_task
*timeout_task
, int ticks
)
276 KASSERT(timeout_task
->q
== NULL
|| timeout_task
->q
== queue
,
278 timeout_task
->q
= queue
;
279 res
= timeout_task
->t
.ta_pending
;
281 taskqueue_enqueue_locked(queue
, &timeout_task
->t
);
284 if ((timeout_task
->f
& DT_CALLOUT_ARMED
) != 0) {
287 queue
->tq_callouts
++;
288 timeout_task
->f
|= DT_CALLOUT_ARMED
;
291 callout_reset(&timeout_task
->c
, ticks
, taskqueue_timeout_func
,
298 taskqueue_block(struct taskqueue
*queue
)
301 queue
->tq_flags
|= TQ_FLAGS_BLOCKED
;
306 taskqueue_unblock(struct taskqueue
*queue
)
309 queue
->tq_flags
&= ~TQ_FLAGS_BLOCKED
;
310 if (queue
->tq_flags
& TQ_FLAGS_PENDING
) {
311 queue
->tq_flags
&= ~TQ_FLAGS_PENDING
;
312 if (queue
->tq_enqueue
)
313 queue
->tq_enqueue(queue
->tq_context
);
319 taskqueue_run(struct taskqueue
*queue
, int lock_held
)
326 while (STAILQ_FIRST(&queue
->tq_queue
)) {
328 * Carefully remove the first task from the queue and
329 * zero its pending count.
331 task
= STAILQ_FIRST(&queue
->tq_queue
);
332 STAILQ_REMOVE_HEAD(&queue
->tq_queue
, ta_link
);
333 pending
= task
->ta_pending
;
334 task
->ta_pending
= 0;
335 queue
->tq_running
= task
;
338 task
->ta_func(task
->ta_context
, pending
);
339 queue
->tq_running
= NULL
;
348 taskqueue_cancel_locked(struct taskqueue
*queue
, struct task
*task
,
352 if (task
->ta_pending
> 0)
353 STAILQ_REMOVE(&queue
->tq_queue
, task
, task
, ta_link
);
355 *pendp
= task
->ta_pending
;
356 task
->ta_pending
= 0;
357 return (task
== queue
->tq_running
? EBUSY
: 0);
361 taskqueue_cancel(struct taskqueue
*queue
, struct task
*task
, u_int
*pendp
)
366 error
= taskqueue_cancel_locked(queue
, task
, pendp
);
373 taskqueue_cancel_timeout(struct taskqueue
*queue
,
374 struct timeout_task
*timeout_task
, u_int
*pendp
)
376 u_int pending
, pending1
;
379 pending
= !!callout_stop(&timeout_task
->c
);
381 error
= taskqueue_cancel_locked(queue
, &timeout_task
->t
, &pending1
);
382 if ((timeout_task
->f
& DT_CALLOUT_ARMED
) != 0) {
383 timeout_task
->f
&= ~DT_CALLOUT_ARMED
;
384 queue
->tq_callouts
--;
389 *pendp
= pending
+ pending1
;
394 taskqueue_drain(struct taskqueue
*queue
, struct task
*task
)
397 while (task
->ta_pending
!= 0 || task
== queue
->tq_running
)
398 TQ_SLEEP(queue
, task
, "-");
403 taskqueue_drain_timeout(struct taskqueue
*queue
,
404 struct timeout_task
*timeout_task
)
407 callout_stop_sync(&timeout_task
->c
);
408 taskqueue_drain(queue
, &timeout_task
->t
);
412 taskqueue_swi_enqueue(void *context
)
418 taskqueue_swi_run(void *arg
, void *frame
)
420 taskqueue_run(taskqueue_swi
, 0);
424 taskqueue_swi_mp_run(void *arg
, void *frame
)
426 taskqueue_run(taskqueue_swi_mp
, 0);
430 taskqueue_start_threads(struct taskqueue
**tqp
, int count
, int pri
, int ncpu
,
431 const char *fmt
, ...)
435 struct taskqueue
*tq
;
437 char ktname
[MAXCOMLEN
];
446 kvsnprintf(ktname
, MAXCOMLEN
, fmt
, ap
);
449 tq
->tq_threads
= kmalloc(sizeof(struct thread
*) * count
, M_TASKQUEUE
,
452 for (i
= 0; i
< count
; i
++) {
454 * If no specific cpu was specified and more than one thread
455 * is to be created, we distribute the threads amongst all
458 if ((ncpu
<= -1) && (count
> 1))
462 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
463 &tq
->tq_threads
[i
], NULL
,
467 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
468 &tq
->tq_threads
[i
], NULL
,
473 kprintf("%s: lwkt_create(%s): error %d", __func__
,
475 tq
->tq_threads
[i
] = NULL
;
477 td
= tq
->tq_threads
[i
];
478 lwkt_setpri_initial(td
, pri
);
488 taskqueue_thread_loop(void *arg
)
490 struct taskqueue
**tqp
, *tq
;
495 while ((tq
->tq_flags
& TQ_FLAGS_ACTIVE
) != 0) {
496 taskqueue_run(tq
, 1);
497 TQ_SLEEP(tq
, tq
, "tqthr");
500 /* rendezvous with thread that asked us to terminate */
503 wakeup_one(tq
->tq_threads
);
507 /* NOTE: tq must be locked */
509 taskqueue_thread_enqueue(void *context
)
511 struct taskqueue
**tqp
, *tq
;
516 /* Unlock spinlock before wakeup_one() */
522 TASKQUEUE_DEFINE(swi
, taskqueue_swi_enqueue
, 0,
523 register_swi(SWI_TQ
, taskqueue_swi_run
, NULL
, "swi_taskq", NULL
, -1));
525 * XXX: possibly use a different SWI_TQ_MP or so.
526 * related: sys/interrupt.h
527 * related: platform/XXX/isa/ipl_funcs.c
529 TASKQUEUE_DEFINE(swi_mp
, taskqueue_swi_enqueue
, 0,
530 register_swi_mp(SWI_TQ
, taskqueue_swi_mp_run
, NULL
, "swi_mp_taskq", NULL
,
533 struct taskqueue
*taskqueue_thread
[MAXCPU
];
540 lockinit(&taskqueue_queues_lock
, "tqqueues", 0, 0);
541 STAILQ_INIT(&taskqueue_queues
);
543 for (cpu
= 0; cpu
< ncpus
; cpu
++) {
544 taskqueue_thread
[cpu
] = taskqueue_create("thread", M_INTWAIT
,
545 taskqueue_thread_enqueue
, &taskqueue_thread
[cpu
]);
546 taskqueue_start_threads(&taskqueue_thread
[cpu
], 1,
547 TDPRI_KERN_DAEMON
, cpu
, "taskq_cpu %d", cpu
);
551 SYSINIT(taskqueueinit
, SI_SUB_PRE_DRIVERS
, SI_ORDER_ANY
, taskqueue_init
, NULL
);