2 * Copyright (c) 2000 Doug Rabson
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.1.2.3 2003/09/10 00:40:39 ken Exp $
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/thread2.h>
39 #include <sys/spinlock.h>
40 #include <sys/spinlock2.h>
41 #include <sys/serialize.h>
43 #include <machine/varargs.h>
45 MALLOC_DEFINE(M_TASKQUEUE
, "taskqueue", "Task Queues");
47 static STAILQ_HEAD(taskqueue_list
, taskqueue
) taskqueue_queues
;
48 static struct lock taskqueue_queues_lock
;
51 STAILQ_ENTRY(taskqueue
) tq_link
;
52 STAILQ_HEAD(, task
) tq_queue
;
54 taskqueue_enqueue_fn tq_enqueue
;
57 struct task
*tq_running
;
58 struct spinlock tq_lock
;
59 struct thread
**tq_threads
;
64 #define TQ_FLAGS_ACTIVE (1 << 0)
65 #define TQ_FLAGS_BLOCKED (1 << 1)
66 #define TQ_FLAGS_PENDING (1 << 2)
68 static void taskqueue_run(struct taskqueue
*queue
, int lock_held
);
71 TQ_LOCK_INIT(struct taskqueue
*tq
)
73 spin_init(&tq
->tq_lock
);
77 TQ_LOCK_UNINIT(struct taskqueue
*tq
)
79 spin_uninit(&tq
->tq_lock
);
83 TQ_LOCK(struct taskqueue
*tq
)
85 spin_lock(&tq
->tq_lock
);
89 TQ_UNLOCK(struct taskqueue
*tq
)
91 spin_unlock(&tq
->tq_lock
);
95 TQ_SLEEP(struct taskqueue
*tq
, void *ident
, const char *wmesg
)
97 ssleep(ident
, &tq
->tq_lock
, 0, wmesg
, 0);
101 taskqueue_create(const char *name
, int mflags
,
102 taskqueue_enqueue_fn enqueue
, void *context
)
104 struct taskqueue
*queue
;
106 queue
= kmalloc(sizeof(*queue
), M_TASKQUEUE
, mflags
| M_ZERO
);
109 STAILQ_INIT(&queue
->tq_queue
);
110 queue
->tq_name
= name
;
111 queue
->tq_enqueue
= enqueue
;
112 queue
->tq_context
= context
;
113 queue
->tq_flags
|= TQ_FLAGS_ACTIVE
;
116 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
117 STAILQ_INSERT_TAIL(&taskqueue_queues
, queue
, tq_link
);
118 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
124 taskqueue_terminate(struct thread
**pp
, struct taskqueue
*tq
)
126 while(tq
->tq_tcount
> 0) {
128 TQ_SLEEP(tq
, pp
, "taskqueue_terminate");
133 taskqueue_free(struct taskqueue
*queue
)
136 queue
->tq_flags
&= ~TQ_FLAGS_ACTIVE
;
137 taskqueue_run(queue
, 1);
138 taskqueue_terminate(queue
->tq_threads
, queue
);
141 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
142 STAILQ_REMOVE(&taskqueue_queues
, queue
, taskqueue
, tq_link
);
143 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
145 TQ_LOCK_UNINIT(queue
);
147 kfree(queue
, M_TASKQUEUE
);
151 taskqueue_find(const char *name
)
153 struct taskqueue
*queue
;
155 lockmgr(&taskqueue_queues_lock
, LK_EXCLUSIVE
);
156 STAILQ_FOREACH(queue
, &taskqueue_queues
, tq_link
) {
157 if (!strcmp(queue
->tq_name
, name
)) {
158 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
162 lockmgr(&taskqueue_queues_lock
, LK_RELEASE
);
167 * NOTE! If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
168 * be sure NOT TO SHARE the ``task'' between CPUs. TASKS ARE NOT LOCKED.
169 * So either use a throwaway task which will only be enqueued once, or
170 * use one task per CPU!
173 taskqueue_enqueue(struct taskqueue
*queue
, struct task
*task
)
181 * Don't allow new tasks on a queue which is being freed.
183 if ((queue
->tq_flags
& TQ_FLAGS_ACTIVE
) == 0) {
189 * Count multiple enqueues.
191 if (task
->ta_pending
) {
198 * Optimise the case when all tasks have the same priority.
200 prev
= STAILQ_LAST(&queue
->tq_queue
, task
, ta_link
);
201 if (!prev
|| prev
->ta_priority
>= task
->ta_priority
) {
202 STAILQ_INSERT_TAIL(&queue
->tq_queue
, task
, ta_link
);
205 for (ins
= STAILQ_FIRST(&queue
->tq_queue
); ins
;
206 prev
= ins
, ins
= STAILQ_NEXT(ins
, ta_link
))
207 if (ins
->ta_priority
< task
->ta_priority
)
211 STAILQ_INSERT_AFTER(&queue
->tq_queue
, prev
, task
, ta_link
);
213 STAILQ_INSERT_HEAD(&queue
->tq_queue
, task
, ta_link
);
216 task
->ta_pending
= 1;
217 if ((queue
->tq_flags
& TQ_FLAGS_BLOCKED
) == 0) {
218 if (queue
->tq_enqueue
)
219 queue
->tq_enqueue(queue
->tq_context
);
221 queue
->tq_flags
|= TQ_FLAGS_PENDING
;
230 taskqueue_block(struct taskqueue
*queue
)
233 queue
->tq_flags
|= TQ_FLAGS_BLOCKED
;
238 taskqueue_unblock(struct taskqueue
*queue
)
241 queue
->tq_flags
&= ~TQ_FLAGS_BLOCKED
;
242 if (queue
->tq_flags
& TQ_FLAGS_PENDING
) {
243 queue
->tq_flags
&= ~TQ_FLAGS_PENDING
;
244 if (queue
->tq_enqueue
)
245 queue
->tq_enqueue(queue
->tq_context
);
251 taskqueue_run(struct taskqueue
*queue
, int lock_held
)
258 while (STAILQ_FIRST(&queue
->tq_queue
)) {
260 * Carefully remove the first task from the queue and
261 * zero its pending count.
263 task
= STAILQ_FIRST(&queue
->tq_queue
);
264 STAILQ_REMOVE_HEAD(&queue
->tq_queue
, ta_link
);
265 pending
= task
->ta_pending
;
266 task
->ta_pending
= 0;
267 queue
->tq_running
= task
;
270 task
->ta_func(task
->ta_context
, pending
);
273 queue
->tq_running
= NULL
;
281 taskqueue_drain(struct taskqueue
*queue
, struct task
*task
)
284 while (task
->ta_pending
!= 0 || task
== queue
->tq_running
)
285 TQ_SLEEP(queue
, task
, "-");
290 taskqueue_swi_enqueue(void *context
)
296 taskqueue_swi_run(void *arg
, void *frame
)
298 taskqueue_run(taskqueue_swi
, 0);
302 taskqueue_swi_mp_run(void *arg
, void *frame
)
304 taskqueue_run(taskqueue_swi_mp
, 0);
308 taskqueue_start_threads(struct taskqueue
**tqp
, int count
, int pri
, int ncpu
,
309 const char *fmt
, ...)
313 struct taskqueue
*tq
;
315 char ktname
[MAXCOMLEN
];
324 kvsnprintf(ktname
, MAXCOMLEN
, fmt
, ap
);
327 tq
->tq_threads
= kmalloc(sizeof(struct thread
*) * count
, M_TASKQUEUE
,
330 for (i
= 0; i
< count
; i
++) {
332 * If no specific cpu was specified and more than one thread
333 * is to be created, we distribute the threads amongst all
336 if ((ncpu
<= -1) && (count
> 1))
340 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
341 &tq
->tq_threads
[i
], NULL
,
345 error
= lwkt_create(taskqueue_thread_loop
, tqp
,
346 &tq
->tq_threads
[i
], NULL
,
351 kprintf("%s: lwkt_create(%s): error %d", __func__
,
353 tq
->tq_threads
[i
] = NULL
;
355 td
= tq
->tq_threads
[i
];
356 lwkt_setpri_initial(td
, pri
);
366 taskqueue_thread_loop(void *arg
)
368 struct taskqueue
**tqp
, *tq
;
373 while ((tq
->tq_flags
& TQ_FLAGS_ACTIVE
) != 0) {
374 taskqueue_run(tq
, 1);
375 TQ_SLEEP(tq
, tq
, "tqthr");
378 /* rendezvous with thread that asked us to terminate */
380 wakeup_one(tq
->tq_threads
);
386 taskqueue_thread_enqueue(void *context
)
388 struct taskqueue
**tqp
, *tq
;
396 TASKQUEUE_DEFINE(swi
, taskqueue_swi_enqueue
, 0,
397 register_swi(SWI_TQ
, taskqueue_swi_run
, NULL
, "swi_taskq", NULL
));
399 * XXX: possibly use a different SWI_TQ_MP or so.
400 * related: sys/interrupt.h
401 * related: platform/XXX/isa/ipl_funcs.c
403 TASKQUEUE_DEFINE(swi_mp
, taskqueue_swi_enqueue
, 0,
404 register_swi(SWI_TQ
, taskqueue_swi_mp_run
, NULL
, "swi_mp_taskq", NULL
));
406 struct taskqueue
*taskqueue_thread
[MAXCPU
];
413 lockinit(&taskqueue_queues_lock
, "tqqueues", 0, 0);
414 STAILQ_INIT(&taskqueue_queues
);
416 for (cpu
= 0; cpu
< ncpus
; cpu
++) {
417 taskqueue_thread
[cpu
] = taskqueue_create("thread", M_INTWAIT
,
418 taskqueue_thread_enqueue
, &taskqueue_thread
[cpu
]);
419 taskqueue_start_threads(&taskqueue_thread
[cpu
], 1,
420 TDPRI_KERN_DAEMON
, cpu
, "taskq_cpu %d", cpu
);
424 SYSINIT(taskqueueinit
, SI_SUB_PRE_DRIVERS
, SI_ORDER_ANY
, taskqueue_init
, NULL
);