From 6b26ab14487d24407e2ddbb5a33147badae41622 Mon Sep 17 00:00:00 2001 From: Alex Hornung Date: Fri, 2 Oct 2009 13:20:30 +0100 Subject: [PATCH] taskqueue - Improve _start_threads, refactor code * Change taskqueue_start_threads to take another argument, ncpu, which specifies on what cpu the thread(s) should be created. If ncpu is <= -1 and count is > 1, then each of the count threads is allocated in a round robin fashion to each cpu. * Switch the per-cpu taskqueues to use the new taskqueue threads API. Discussed-with: Simon "corecode" Schubert --- sys/kern/subr_taskqueue.c | 44 ++++++++++++++++---------------------------- sys/sys/taskqueue.h | 3 ++- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/sys/kern/subr_taskqueue.c b/sys/kern/subr_taskqueue.c index 9b9abb17c4..e160730f02 100644 --- a/sys/kern/subr_taskqueue.c +++ b/sys/kern/subr_taskqueue.c @@ -306,19 +306,20 @@ taskqueue_swi_mp_run(void *arg, void *frame) } int -taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, +taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu, const char *fmt, ...) { __va_list ap; struct thread *td; struct taskqueue *tq; - int i, error; + int i, error, cpu; char ktname[MAXCOMLEN]; if (count <= 0) return EINVAL; tq = *tqp; + cpu = ncpu; __va_start(ap, fmt); kvsnprintf(ktname, MAXCOMLEN, fmt, ap); @@ -328,8 +329,16 @@ taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, M_WAITOK | M_ZERO); for (i = 0; i < count; i++) { + /* + * If no specific cpu was specified and more than one thread + * is to be created, we distribute the threads amongst all + * cpus. + */ + if ((ncpu <= -1) && (count > 1)) + cpu = i%ncpus; + error = lwkt_create(taskqueue_thread_loop, tqp, - &tq->tq_threads[i], NULL, TDF_STOPREQ | TDF_MPSAFE, -1, + &tq->tq_threads[i], NULL, TDF_STOPREQ | TDF_MPSAFE, cpu, "%s_%d", ktname, i); if (error) { kprintf("%s: kthread_add(%s): error %d", __func__, @@ -356,7 +365,7 @@ taskqueue_thread_loop(void *arg) TQ_LOCK(tq); while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { taskqueue_run(tq); - TQ_SLEEP(tq, tq, "-"); + TQ_SLEEP(tq, tq, "tqthr"); } /* rendezvous with thread that asked us to terminate */ @@ -387,26 +396,6 @@ TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0, TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0, register_swi(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL)); -static void -taskqueue_kthread(void *arg) -{ - struct taskqueue *queue = taskqueue_thread[mycpuid]; - - for (;;) { - taskqueue_run(queue); - TQ_LOCK(queue); - if (STAILQ_EMPTY(&queue->tq_queue)) - TQ_SLEEP(queue, queue, "tqthr"); - TQ_UNLOCK(queue); - } -} - -static void -taskqueue_kthread_enqueue(void *context) -{ - wakeup(taskqueue_thread[mycpuid]); -} - struct taskqueue *taskqueue_thread[MAXCPU]; static struct thread *taskqueue_thread_td[MAXCPU]; @@ -420,10 +409,9 @@ taskqueue_init(void) for (cpu = 0; cpu < ncpus; cpu++) { taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT, - taskqueue_kthread_enqueue, NULL); - lwkt_create(taskqueue_kthread, NULL, - &taskqueue_thread_td[cpu], NULL, - 0, cpu, "taskqueue %d", cpu); + taskqueue_thread_enqueue, &taskqueue_thread[cpu]); + taskqueue_start_threads(&taskqueue_thread[cpu], 1, + TDPRI_KERN_DAEMON, cpu, "taskq_cpu"); } } diff --git a/sys/sys/taskqueue.h b/sys/sys/taskqueue.h index 334e472372..d07d555a70 100644 --- a/sys/sys/taskqueue.h +++ b/sys/sys/taskqueue.h @@ -67,7 +67,8 @@ struct taskqueue *taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context); int taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, - const char *name, ...) __printflike(4, 5); + int ncpu, const char *name, ...) + __printflike(5, 6); int taskqueue_enqueue(struct taskqueue *queue, struct task *task); void taskqueue_drain(struct taskqueue *queue, struct task *task); struct taskqueue *taskqueue_find(const char *name); -- 2.11.4.GIT