2 * Copyright (c) 2000 Doug Rabson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.1.2.3 2003/09/10 00:40:39 ken Exp $
27 * $DragonFly: src/sys/kern/subr_taskqueue.c,v 1.13 2008/06/07 11:44:04 mneumann Exp $
30 #include <sys/param.h>
31 #include <sys/queue.h>
32 #include <sys/systm.h>
33 #include <sys/kernel.h>
34 #include <sys/taskqueue.h>
35 #include <sys/interrupt.h>
37 #include <sys/malloc.h>
38 #include <sys/kthread.h>
39 #include <sys/thread2.h>
40 #include <sys/spinlock.h>
41 #include <sys/spinlock2.h>
42 #include <sys/serialize.h>
44 #include <machine/varargs.h>
46 MALLOC_DEFINE(M_TASKQUEUE
, "taskqueue", "Task Queues");
48 static STAILQ_HEAD(taskqueue_list
, taskqueue
) taskqueue_queues
;
49 static struct lock taskqueue_queues_lock
;
52 STAILQ_ENTRY(taskqueue
) tq_link
;
53 STAILQ_HEAD(, task
) tq_queue
;
55 taskqueue_enqueue_fn tq_enqueue
;
58 struct task
*tq_running
;
59 struct spinlock tq_lock
;
60 struct thread
**tq_threads
;
65 #define TQ_FLAGS_ACTIVE (1 << 0)
66 #define TQ_FLAGS_BLOCKED (1 << 1)
67 #define TQ_FLAGS_PENDING (1 << 2)
69 static void taskqueue_run(struct taskqueue
*queue
, int lock_held
);
72 TQ_LOCK_INIT(struct taskqueue
*tq
)
74 spin_init(&tq
->tq_lock
);
78 TQ_LOCK_UNINIT(struct taskqueue
*tq
)
80 spin_uninit(&tq
->tq_lock
);
84 TQ_LOCK(struct taskqueue
*tq
)
86 spin_lock_wr(&tq
->tq_lock
);
90 TQ_UNLOCK(struct taskqueue
*tq
)
92 spin_unlock_wr(&tq
->tq_lock
);
96 TQ_SLEEP(struct taskqueue
*tq
, void *ident
, const char *wmesg
)
98 ssleep(ident
, &tq
->tq_lock
, 0, wmesg
, 0);
102 taskqueue_create(const char *name
, int mflags
,
103 taskqueue_enqueue_fn enqueue
, void *context
)
105 struct taskqueue
*queue
;
107 queue
= kmalloc(sizeof(*queue
), M_TASKQUEUE
, mflags
| M_ZERO
);
110 STAILQ_INIT(&queue
->tq_queue
);
111 queue
->tq_name
= name
;
112 queue
->tq_enqueue
= enqueue
;
113 queue
->tq_context
= context
;
114 queue
->tq_flags
|= TQ_FLAGS_ACTIVE
;
117 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
118 STAILQ_INSERT_TAIL(&taskqueue_queues
, queue
, tq_link
);
119 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
125 taskqueue_terminate(struct thread
**pp
, struct taskqueue
*tq
)
127 while(tq
->tq_tcount
> 0) {
129 TQ_SLEEP(tq
, pp
, "taskqueue_terminate");
134 taskqueue_free(struct taskqueue
*queue
)
137 queue
->tq_flags
&= ~TQ_FLAGS_ACTIVE
;
138 taskqueue_run(queue
, 1);
139 taskqueue_terminate(queue
->tq_threads
, queue
);
142 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
143 STAILQ_REMOVE(&taskqueue_queues
, queue
, taskqueue
, tq_link
);
144 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
146 TQ_LOCK_UNINIT(queue
);
148 kfree(queue
, M_TASKQUEUE
);
152 taskqueue_find(const char *name
)
154 struct taskqueue
*queue
;
156 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
157 STAILQ_FOREACH(queue
, &taskqueue_queues
, tq_link
) {
158 if (!strcmp(queue
->tq_name
, name
)) {
159 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
163 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
168 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
169 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED.
170 * So either use a throwaway task which will only be enqueued once, or
171 * use one task per CPU!
174 taskqueue_enqueue(struct taskqueue
*queue
, struct task
*task
)
182 * Don't allow new tasks on a queue which is being freed.
184 if ((queue
->tq_flags
& TQ_FLAGS_ACTIVE
) == 0) {
190 * Count multiple enqueues.
192 if (task
->ta_pending
) {
199 * Optimise the case when all tasks have the same priority.
201 prev
= STAILQ_LAST(&queue
->tq_queue
, task
, ta_link
);
202 if (!prev
|| prev
->ta_priority
>= task
->ta_priority
) {
203 STAILQ_INSERT_TAIL(&queue
->tq_queue
, task
, ta_link
);
206 for (ins
= STAILQ_FIRST(&queue
->tq_queue
); ins
;
207 prev
= ins
, ins
= STAILQ_NEXT(ins
, ta_link
))
208 if (ins
->ta_priority
< task
->ta_priority
)
212 STAILQ_INSERT_AFTER(&queue
->tq_queue
, prev
, task
, ta_link
);
214 STAILQ_INSERT_HEAD(&queue
->tq_queue
, task
, ta_link
);
217 task
->ta_pending
= 1;
218 if ((queue
->tq_flags
& TQ_FLAGS_BLOCKED
) == 0) {
219 if (queue
->tq_enqueue
)
220 queue
->tq_enqueue(queue
->tq_context
);
222 queue
->tq_flags
|= TQ_FLAGS_PENDING
;
231 taskqueue_block(struct taskqueue
*queue
)
234 queue
->tq_flags
|= TQ_FLAGS_BLOCKED
;
239 taskqueue_unblock(struct taskqueue
*queue
)
242 queue
->tq_flags
&= ~TQ_FLAGS_BLOCKED
;
243 if (queue
->tq_flags
& TQ_FLAGS_PENDING
) {
244 queue
->tq_flags
&= ~TQ_FLAGS_PENDING
;
245 if (queue
->tq_enqueue
)
246 queue
->tq_enqueue(queue
->tq_context
);
252 taskqueue_run(struct taskqueue
*queue
, int lock_held
)
259 while (STAILQ_FIRST(&queue
->tq_queue
)) {
261 * Carefully remove the first task from the queue and
262 * zero its pending count.
264 task
= STAILQ_FIRST(&queue
->tq_queue
);
265 STAILQ_REMOVE_HEAD(&queue
->tq_queue
, ta_link
);
266 pending
= task
->ta_pending
;
267 task
->ta_pending
= 0;
268 queue
->tq_running
= task
;
271 task
->ta_func(task
->ta_context
, pending
);
274 queue
->tq_running
= NULL
;
282 taskqueue_drain(struct taskqueue
*queue
, struct task
*task
)
285 while (task
->ta_pending
!= 0 || task
== queue
->tq_running
)
286 TQ_SLEEP(queue
, task
, "-");
291 taskqueue_swi_enqueue(void *context
)
297 taskqueue_swi_run(void *arg
, void *frame
)
299 taskqueue_run(taskqueue_swi
, 0);
303 taskqueue_swi_mp_run(void *arg
, void *frame
)
305 taskqueue_run(taskqueue_swi_mp
, 0);
309 taskqueue_start_threads(struct taskqueue
**tqp
, int count
, int pri
, int ncpu
,
310 const char *fmt
, ...)
314 struct taskqueue
*tq
;
316 char ktname
[MAXCOMLEN
];
325 kvsnprintf(ktname
, MAXCOMLEN
, fmt
, ap
);
328 tq
->tq_threads
= kmalloc(sizeof(struct thread
*) * count
, M_TASKQUEUE
,
331 for (i
= 0; i
< count
; i
++) {
333 * If no specific cpu was specified and more than one thread
334 * is to be created, we distribute the threads amongst all
337 if ((ncpu
<= -1) && (count
> 1))
341 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
342 &tq
->tq_threads
[i
], NULL
, TDF_STOPREQ
| TDF_MPSAFE
,
345 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
346 &tq
->tq_threads
[i
], NULL
, TDF_STOPREQ
| TDF_MPSAFE
,
347 cpu
, "%s_%d", ktname
, i
);
350 kprintf("%s: kthread_add(%s): error %d", __func__
,
352 tq
->tq_threads
[i
] = NULL
;
354 td
= tq
->tq_threads
[i
];
355 lwkt_setpri_initial(td
, pri
);
365 taskqueue_thread_loop(void *arg
)
367 struct taskqueue
**tqp
, *tq
;
372 while ((tq
->tq_flags
& TQ_FLAGS_ACTIVE
) != 0) {
373 taskqueue_run(tq
, 1);
374 TQ_SLEEP(tq
, tq
, "tqthr");
377 /* rendezvous with thread that asked us to terminate */
379 wakeup_one(tq
->tq_threads
);
385 taskqueue_thread_enqueue(void *context
)
387 struct taskqueue
**tqp
, *tq
;
395 TASKQUEUE_DEFINE(swi
, taskqueue_swi_enqueue
, 0,
396 register_swi(SWI_TQ
, taskqueue_swi_run
, NULL
, "swi_taskq", NULL
));
398 * XXX: possibly use a different SWI_TQ_MP or so.
399 * related: sys/interrupt.h
400 * related: platform/XXX/isa/ipl_funcs.c
402 TASKQUEUE_DEFINE(swi_mp
, taskqueue_swi_enqueue
, 0,
403 register_swi(SWI_TQ
, taskqueue_swi_mp_run
, NULL
, "swi_mp_taskq", NULL
));
405 struct taskqueue
*taskqueue_thread
[MAXCPU
];
412 lockinit(&taskqueue_queues_lock
, "tqqueues", 0, 0);
413 STAILQ_INIT(&taskqueue_queues
);
415 for (cpu
= 0; cpu
< ncpus
; cpu
++) {
416 taskqueue_thread
[cpu
] = taskqueue_create("thread", M_INTWAIT
,
417 taskqueue_thread_enqueue
, &taskqueue_thread
[cpu
]);
418 taskqueue_start_threads(&taskqueue_thread
[cpu
], 1,
419 TDPRI_KERN_DAEMON
, cpu
, "taskq_cpu %d", cpu
);
423 SYSINIT(taskqueueinit
, SI_SUB_PRE_DRIVERS
, SI_ORDER_ANY
, taskqueue_init
, NULL
);