Handle streams separately in tree_add_track()
[cmus.git] / worker.c
blobb936f3aa10a5846e8bb6cd4272ff308244cb9961
1 #include "worker.h"
2 #include "locking.h"
3 #include "list.h"
4 #include "xmalloc.h"
5 #include "debug.h"
7 #include <stdlib.h>
8 #include <pthread.h>
10 struct worker_job {
11 struct list_head node;
12 /* >0, 0 is 'any' */
13 int type;
14 void (*job_cb)(void *data);
15 void (*free_cb)(void *data);
16 void *data;
19 static LIST_HEAD(worker_job_head);
20 static pthread_mutex_t worker_mutex = CMUS_MUTEX_INITIALIZER;
21 static pthread_cond_t worker_cond = PTHREAD_COND_INITIALIZER;
22 static pthread_t worker_thread;
23 static int running = 1;
24 static int cancel_type = JOB_TYPE_NONE;
27 * - only worker thread modifies this
28 * - cur_job->job_cb can read this without locking
29 * - anyone else must lock worker before reading this
31 static struct worker_job *cur_job = NULL;
33 #define worker_lock() cmus_mutex_lock(&worker_mutex)
34 #define worker_unlock() cmus_mutex_unlock(&worker_mutex)
36 static void *worker_loop(void *arg)
38 worker_lock();
39 while (1) {
40 if (list_empty(&worker_job_head)) {
41 int rc;
43 if (!running)
44 break;
46 rc = pthread_cond_wait(&worker_cond, &worker_mutex);
47 if (rc)
48 d_print("pthread_cond_wait: %s\n", strerror(rc));
49 } else {
50 struct list_head *item = worker_job_head.next;
51 uint64_t t;
53 list_del(item);
54 cur_job = container_of(item, struct worker_job, node);
55 worker_unlock();
57 t = timer_get();
58 cur_job->job_cb(cur_job->data);
59 timer_print("worker job", timer_get() - t);
61 worker_lock();
62 cur_job->free_cb(cur_job->data);
63 free(cur_job);
64 cur_job = NULL;
66 // wakeup worker_remove_jobs() if needed
67 if (cancel_type != JOB_TYPE_NONE)
68 pthread_cond_signal(&worker_cond);
71 worker_unlock();
72 return NULL;
75 void worker_init(void)
77 int rc = pthread_create(&worker_thread, NULL, worker_loop, NULL);
79 BUG_ON(rc);
82 void worker_exit(void)
84 worker_lock();
85 running = 0;
86 pthread_cond_signal(&worker_cond);
87 worker_unlock();
89 pthread_join(worker_thread, NULL);
92 void worker_add_job(int type, void (*job_cb)(void *data),
93 void (*free_cb)(void *data), void *data)
95 struct worker_job *job;
97 job = xnew(struct worker_job, 1);
98 job->type = type;
99 job->job_cb = job_cb;
100 job->free_cb = free_cb;
101 job->data = data;
103 worker_lock();
104 list_add_tail(&job->node, &worker_job_head);
105 pthread_cond_signal(&worker_cond);
106 worker_unlock();
109 void worker_remove_jobs(int type)
111 struct list_head *item;
113 worker_lock();
114 cancel_type = type;
116 /* remove jobs of the specified type from the queue */
117 item = worker_job_head.next;
118 while (item != &worker_job_head) {
119 struct worker_job *job = container_of(item, struct worker_job, node);
120 struct list_head *next = item->next;
122 if (type == JOB_TYPE_ANY || type == job->type) {
123 list_del(&job->node);
124 job->free_cb(job->data);
125 free(job);
127 item = next;
130 /* wait current job to finish or cancel if it's of the specified type */
131 if (cur_job && (type == JOB_TYPE_ANY || type == cur_job->type))
132 pthread_cond_wait(&worker_cond, &worker_mutex);
134 cancel_type = JOB_TYPE_NONE;
135 worker_unlock();
138 int worker_has_job(int type)
140 struct worker_job *job;
141 int has_job = 0;
143 worker_lock();
144 list_for_each_entry(job, &worker_job_head, node) {
145 if (type == JOB_TYPE_ANY || type == job->type) {
146 has_job = 1;
147 break;
150 if (cur_job && (type == JOB_TYPE_ANY || type == cur_job->type))
151 has_job = 1;
152 worker_unlock();
153 return has_job;
157 * this is only called from the worker thread
158 * cur_job is guaranteed to be non-NULL
160 int worker_cancelling(void)
162 return cur_job->type == cancel_type || cancel_type == JOB_TYPE_ANY;