2 * Copyright (c) 2000 Doug Rabson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.1.2.3 2003/09/10 00:40:39 ken Exp $
27 * $DragonFly: src/sys/kern/subr_taskqueue.c,v 1.13 2008/06/07 11:44:04 mneumann Exp $
30 #include <sys/param.h>
31 #include <sys/queue.h>
32 #include <sys/systm.h>
33 #include <sys/kernel.h>
34 #include <sys/taskqueue.h>
35 #include <sys/interrupt.h>
37 #include <sys/malloc.h>
38 #include <sys/kthread.h>
39 #include <sys/thread2.h>
40 #include <sys/spinlock.h>
41 #include <sys/spinlock2.h>
42 #include <sys/serialize.h>
44 #include <machine/varargs.h>
46 MALLOC_DEFINE(M_TASKQUEUE
, "taskqueue", "Task Queues");
48 static STAILQ_HEAD(taskqueue_list
, taskqueue
) taskqueue_queues
;
49 static struct lock taskqueue_queues_lock
;
52 STAILQ_ENTRY(taskqueue
) tq_link
;
53 STAILQ_HEAD(, task
) tq_queue
;
55 taskqueue_enqueue_fn tq_enqueue
;
58 struct task
*tq_running
;
59 struct spinlock tq_lock
;
60 struct thread
**tq_threads
;
65 #define TQ_FLAGS_ACTIVE (1 << 0)
66 #define TQ_FLAGS_BLOCKED (1 << 1)
67 #define TQ_FLAGS_PENDING (1 << 2)
70 TQ_LOCK_INIT(struct taskqueue
*tq
)
72 spin_init(&tq
->tq_lock
);
76 TQ_LOCK_UNINIT(struct taskqueue
*tq
)
78 spin_uninit(&tq
->tq_lock
);
82 TQ_LOCK(struct taskqueue
*tq
)
84 spin_lock_wr(&tq
->tq_lock
);
88 TQ_UNLOCK(struct taskqueue
*tq
)
90 spin_unlock_wr(&tq
->tq_lock
);
94 TQ_SLEEP(struct taskqueue
*tq
, void *ident
, const char *wmesg
)
96 ssleep(ident
, &tq
->tq_lock
, 0, wmesg
, 0);
100 taskqueue_create(const char *name
, int mflags
,
101 taskqueue_enqueue_fn enqueue
, void *context
)
103 struct taskqueue
*queue
;
105 queue
= kmalloc(sizeof(*queue
), M_TASKQUEUE
, mflags
| M_ZERO
);
108 STAILQ_INIT(&queue
->tq_queue
);
109 queue
->tq_name
= name
;
110 queue
->tq_enqueue
= enqueue
;
111 queue
->tq_context
= context
;
112 queue
->tq_flags
|= TQ_FLAGS_ACTIVE
;
115 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
116 STAILQ_INSERT_TAIL(&taskqueue_queues
, queue
, tq_link
);
117 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
123 taskqueue_terminate(struct thread
**pp
, struct taskqueue
*tq
)
125 while(tq
->tq_tcount
> 0) {
127 TQ_SLEEP(tq
, pp
, "taskqueue_terminate");
132 taskqueue_free(struct taskqueue
*queue
)
135 queue
->tq_flags
&= ~TQ_FLAGS_ACTIVE
;
138 taskqueue_run(queue
);
141 taskqueue_terminate(queue
->tq_threads
, queue
);
144 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
145 STAILQ_REMOVE(&taskqueue_queues
, queue
, taskqueue
, tq_link
);
146 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
148 TQ_LOCK_UNINIT(queue
);
150 kfree(queue
, M_TASKQUEUE
);
154 taskqueue_find(const char *name
)
156 struct taskqueue
*queue
;
158 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
159 STAILQ_FOREACH(queue
, &taskqueue_queues
, tq_link
) {
160 if (!strcmp(queue
->tq_name
, name
)) {
161 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
165 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
170 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
171 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED.
172 * So either use a throwaway task which will only be enqueued once, or
173 * use one task per CPU!
176 taskqueue_enqueue(struct taskqueue
*queue
, struct task
*task
)
184 * Don't allow new tasks on a queue which is being freed.
186 if ((queue
->tq_flags
& TQ_FLAGS_ACTIVE
) == 0) {
192 * Count multiple enqueues.
194 if (task
->ta_pending
) {
201 * Optimise the case when all tasks have the same priority.
203 prev
= STAILQ_LAST(&queue
->tq_queue
, task
, ta_link
);
204 if (!prev
|| prev
->ta_priority
>= task
->ta_priority
) {
205 STAILQ_INSERT_TAIL(&queue
->tq_queue
, task
, ta_link
);
208 for (ins
= STAILQ_FIRST(&queue
->tq_queue
); ins
;
209 prev
= ins
, ins
= STAILQ_NEXT(ins
, ta_link
))
210 if (ins
->ta_priority
< task
->ta_priority
)
214 STAILQ_INSERT_AFTER(&queue
->tq_queue
, prev
, task
, ta_link
);
216 STAILQ_INSERT_HEAD(&queue
->tq_queue
, task
, ta_link
);
219 task
->ta_pending
= 1;
220 if ((queue
->tq_flags
& TQ_FLAGS_BLOCKED
) == 0) {
221 if (queue
->tq_enqueue
)
222 queue
->tq_enqueue(queue
->tq_context
);
224 queue
->tq_flags
|= TQ_FLAGS_PENDING
;
233 taskqueue_block(struct taskqueue
*queue
)
236 queue
->tq_flags
|= TQ_FLAGS_BLOCKED
;
241 taskqueue_unblock(struct taskqueue
*queue
)
244 queue
->tq_flags
&= ~TQ_FLAGS_BLOCKED
;
245 if (queue
->tq_flags
& TQ_FLAGS_PENDING
) {
246 queue
->tq_flags
&= ~TQ_FLAGS_PENDING
;
247 if (queue
->tq_enqueue
)
248 queue
->tq_enqueue(queue
->tq_context
);
254 taskqueue_run(struct taskqueue
*queue
)
260 while (STAILQ_FIRST(&queue
->tq_queue
)) {
262 * Carefully remove the first task from the queue and
263 * zero its pending count.
265 task
= STAILQ_FIRST(&queue
->tq_queue
);
266 STAILQ_REMOVE_HEAD(&queue
->tq_queue
, ta_link
);
267 pending
= task
->ta_pending
;
268 task
->ta_pending
= 0;
269 queue
->tq_running
= task
;
272 task
->ta_func(task
->ta_context
, pending
);
275 queue
->tq_running
= NULL
;
282 taskqueue_drain(struct taskqueue
*queue
, struct task
*task
)
285 while (task
->ta_pending
!= 0 || task
== queue
->tq_running
)
286 TQ_SLEEP(queue
, task
, "-");
291 taskqueue_swi_enqueue(void *context
)
297 taskqueue_swi_run(void *arg
, void *frame
)
299 taskqueue_run(taskqueue_swi
);
303 taskqueue_swi_mp_run(void *arg
, void *frame
)
305 taskqueue_run(taskqueue_swi_mp
);
309 taskqueue_start_threads(struct taskqueue
**tqp
, int count
, int pri
, int ncpu
,
310 const char *fmt
, ...)
314 struct taskqueue
*tq
;
316 char ktname
[MAXCOMLEN
];
325 kvsnprintf(ktname
, MAXCOMLEN
, fmt
, ap
);
328 tq
->tq_threads
= kmalloc(sizeof(struct thread
*) * count
, M_TASKQUEUE
,
331 for (i
= 0; i
< count
; i
++) {
333 * If no specific cpu was specified and more than one thread
334 * is to be created, we distribute the threads amongst all
337 if ((ncpu
<= -1) && (count
> 1))
340 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
341 &tq
->tq_threads
[i
], NULL
, TDF_STOPREQ
| TDF_MPSAFE
, cpu
,
344 kprintf("%s: kthread_add(%s): error %d", __func__
,
346 tq
->tq_threads
[i
] = NULL
;
348 td
= tq
->tq_threads
[i
];
349 lwkt_setpri(td
, pri
);
359 taskqueue_thread_loop(void *arg
)
361 struct taskqueue
**tqp
, *tq
;
366 while ((tq
->tq_flags
& TQ_FLAGS_ACTIVE
) != 0) {
368 TQ_SLEEP(tq
, tq
, "tqthr");
371 /* rendezvous with thread that asked us to terminate */
373 wakeup_one(tq
->tq_threads
);
379 taskqueue_thread_enqueue(void *context
)
381 struct taskqueue
**tqp
, *tq
;
389 TASKQUEUE_DEFINE(swi
, taskqueue_swi_enqueue
, 0,
390 register_swi(SWI_TQ
, taskqueue_swi_run
, NULL
, "swi_taskq", NULL
));
392 * XXX: possibly use a different SWI_TQ_MP or so.
393 * related: sys/interrupt.h
394 * related: platform/XXX/isa/ipl_funcs.c
396 TASKQUEUE_DEFINE(swi_mp
, taskqueue_swi_enqueue
, 0,
397 register_swi(SWI_TQ
, taskqueue_swi_mp_run
, NULL
, "swi_mp_taskq", NULL
));
399 struct taskqueue
*taskqueue_thread
[MAXCPU
];
400 static struct thread
*taskqueue_thread_td
[MAXCPU
];
407 lockinit(&taskqueue_queues_lock
, "tqqueues", 0, 0);
408 STAILQ_INIT(&taskqueue_queues
);
410 for (cpu
= 0; cpu
< ncpus
; cpu
++) {
411 taskqueue_thread
[cpu
] = taskqueue_create("thread", M_INTWAIT
,
412 taskqueue_thread_enqueue
, &taskqueue_thread
[cpu
]);
413 taskqueue_start_threads(&taskqueue_thread
[cpu
], 1,
414 TDPRI_KERN_DAEMON
, cpu
, "taskq_cpu");
418 SYSINIT(taskqueueinit
, SI_SUB_PRE_DRIVERS
, SI_ORDER_ANY
, taskqueue_init
, NULL
);