2006-10-27 James Livingston <doclivingston@gmail.com>
[rhythmbox.git] / lib / rb-thread.c
blobfb404ebe883470aa3f49113bb31dc73ffdb3b85a
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*-
3 * Copyright (C) 2005 Colin Walters <walters@verbum.org>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <config.h>
22 #include <string.h>
23 #include <stdlib.h>
25 #include <glib/gi18n.h>
27 #include "rb-thread.h"
28 #include "rb-debug.h"
30 static void rb_thread_class_init (RBThreadClass *klass);
31 static void rb_thread_init (RBThread *view);
32 static void rb_thread_finalize (GObject *object);
33 static void rb_thread_set_property (GObject *object,
34 guint prop_id,
35 const GValue *value,
36 GParamSpec *pspec);
37 static void rb_thread_get_property (GObject *object,
38 guint prop_id,
39 GValue *value,
40 GParamSpec *pspec);
41 static GObject * rb_thread_constructor (GType type, guint n_construct_properties,
42 GObjectConstructParam *construct_properties);
43 static gpointer rb_thread_action_thread_main (gpointer data);
45 struct RBThreadPrivate
47 GMainContext *context;
48 GMainContext *target_context;
49 GMainLoop *loop;
50 GAsyncQueue *action_queue;
51 GAsyncQueue *result_queue;
53 gint action_queue_processors;
55 gboolean thread_running;
56 GMutex *state_mutex;
57 GCond *state_condition;
59 GSList *queued_result_callbacks;
61 gboolean action_processor_queued;
63 RBThreadActionFunc action_func;
64 RBThreadResultFunc result_func;
65 RBThreadActionDestroyFunc action_destroy_func;
66 RBThreadResultDestroyFunc result_destroy_func;
67 gpointer user_data;
69 GThread *thread;
71 gint exit_flag;
74 #define RB_THREAD_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), RB_TYPE_THREAD, RBThreadPrivate))
76 enum
78 PROP_0,
79 PROP_CONTEXT,
80 PROP_ACTION_FUNC,
81 PROP_RESULT_FUNC,
82 PROP_ACTION_DESTROY,
83 PROP_RESULT_DESTROY,
84 PROP_DATA,
87 G_DEFINE_TYPE(RBThread, rb_thread, G_TYPE_OBJECT)
89 static void
90 rb_thread_class_init (RBThreadClass *klass)
92 GObjectClass *object_class = G_OBJECT_CLASS (klass);
94 object_class->finalize = rb_thread_finalize;
95 object_class->constructor = rb_thread_constructor;
97 object_class->set_property = rb_thread_set_property;
98 object_class->get_property = rb_thread_get_property;
100 g_object_class_install_property (object_class,
101 PROP_CONTEXT,
102 g_param_spec_pointer ("context",
103 "GMainContext",
104 "Context in which to invoke callbacks",
105 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
106 g_object_class_install_property (object_class,
107 PROP_ACTION_FUNC,
108 g_param_spec_pointer ("action-func",
109 "GFunc",
110 "Callback function",
111 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
112 g_object_class_install_property (object_class,
113 PROP_ACTION_DESTROY,
114 g_param_spec_pointer ("action-destroy",
115 "GFunc",
116 "Action destroy function",
117 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
118 g_object_class_install_property (object_class,
119 PROP_RESULT_FUNC,
120 g_param_spec_pointer ("result-func",
121 "GFunc",
122 "Callback function",
123 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
124 g_object_class_install_property (object_class,
125 PROP_RESULT_DESTROY,
126 g_param_spec_pointer ("result-destroy",
127 "GFunc",
128 "Result destroy function",
129 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
130 g_object_class_install_property (object_class,
131 PROP_DATA,
132 g_param_spec_pointer ("data",
133 "User data",
134 "User data",
135 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
137 g_type_class_add_private (klass, sizeof (RBThreadPrivate));
140 static void
141 rb_thread_init (RBThread *thread)
143 thread->priv = RB_THREAD_GET_PRIVATE (thread);
145 thread->priv->action_queue = g_async_queue_new ();
146 thread->priv->result_queue = g_async_queue_new ();
147 thread->priv->state_condition = g_cond_new ();
148 thread->priv->action_queue_processors = 0;
149 thread->priv->exit_flag = 0;
152 static void
153 rb_thread_finalize (GObject *object)
155 RBThread *thread;
156 gpointer action;
157 GSList *link;
159 g_return_if_fail (object != NULL);
160 g_return_if_fail (RB_IS_THREAD (object));
162 thread = RB_THREAD (object);
164 g_return_if_fail (thread->priv != NULL);
166 while ((action = g_async_queue_try_pop (thread->priv->action_queue)) != NULL)
167 thread->priv->action_destroy_func (action, thread->priv->user_data);
169 for (link = thread->priv->queued_result_callbacks; link; link = link->next) {
170 GSource *source = link->data;
171 g_source_destroy (source);
173 g_slist_free (thread->priv->queued_result_callbacks);
175 g_async_queue_unref (thread->priv->action_queue);
176 g_async_queue_unref (thread->priv->result_queue);
178 G_OBJECT_CLASS (rb_thread_parent_class)->finalize (object);
182 static void
183 rb_thread_set_property (GObject *object,
184 guint prop_id,
185 const GValue *value,
186 GParamSpec *pspec)
188 RBThread *thread = RB_THREAD (object);
190 switch (prop_id)
192 case PROP_CONTEXT:
193 thread->priv->context = g_value_get_pointer (value);
194 break;
195 case PROP_ACTION_FUNC:
196 thread->priv->action_func = g_value_get_pointer (value);
197 break;
198 case PROP_RESULT_FUNC:
199 thread->priv->action_func = g_value_get_pointer (value);
200 break;
201 case PROP_ACTION_DESTROY:
202 thread->priv->action_destroy_func = g_value_get_pointer (value);
203 break;
204 case PROP_RESULT_DESTROY:
205 thread->priv->result_destroy_func = g_value_get_pointer (value);
206 break;
207 case PROP_DATA:
208 thread->priv->user_data = g_value_get_pointer (value);
209 break;
210 default:
211 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
212 break;
216 static void
217 rb_thread_get_property (GObject *object,
218 guint prop_id,
219 GValue *value,
220 GParamSpec *pspec)
222 RBThread *thread = RB_THREAD (object);
224 switch (prop_id)
226 case PROP_CONTEXT:
227 g_value_set_pointer (value, thread->priv->context);
228 break;
229 case PROP_ACTION_FUNC:
230 g_value_set_pointer (value, thread->priv->action_func);
231 break;
232 case PROP_RESULT_FUNC:
233 g_value_set_pointer (value, thread->priv->result_func);
234 break;
235 case PROP_ACTION_DESTROY:
236 g_value_set_pointer (value, thread->priv->action_destroy_func);
237 break;
238 case PROP_RESULT_DESTROY:
239 g_value_set_pointer (value, thread->priv->result_destroy_func);
240 break;
241 case PROP_DATA:
242 g_value_set_pointer (value, thread->priv->user_data);
243 break;
244 default:
245 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
246 break;
250 RBThread *
251 rb_thread_new (GMainContext *context,
252 RBThreadActionFunc action_cb,
253 RBThreadResultFunc result_cb,
254 RBThreadActionDestroyFunc action_destroy_func,
255 RBThreadResultDestroyFunc result_destroy_func,
256 gpointer user_data)
258 return RB_THREAD (g_object_new (RB_TYPE_THREAD,
259 "context", context,
260 "action-func", action_cb,
261 "result-func", result_cb,
262 "destroyfunc", action_destroy_func,
263 "action-destroy", action_destroy_func,
264 "result-destroy", result_destroy_func,
265 "data", user_data, NULL));
268 static GObject *
269 rb_thread_constructor (GType type, guint n_construct_properties,
270 GObjectConstructParam *construct_properties)
272 RBThread *thread;
273 RBThreadClass *klass;
274 GObjectClass *parent_class;
275 GError *error = NULL;
277 klass = RB_THREAD_CLASS (g_type_class_peek (RB_TYPE_THREAD));
279 parent_class = G_OBJECT_CLASS (g_type_class_peek_parent (klass));
280 thread = RB_THREAD (parent_class->constructor (type, n_construct_properties,
281 construct_properties));
282 thread->priv->thread = g_thread_create (rb_thread_action_thread_main, thread, TRUE, &error);
283 if (!thread->priv->thread)
284 g_error ("Couldn't create thread: %s", error->message);
286 /* Wait until the thread's mainloop is running */
287 g_mutex_lock (thread->priv->state_mutex);
288 while (!thread->priv->thread_running)
289 g_cond_wait (thread->priv->state_condition, thread->priv->state_mutex);
290 g_mutex_unlock (thread->priv->state_mutex);
292 return G_OBJECT (thread);
295 struct RBThreadResultData
297 RBThread *thread;
298 GPtrArray *results;
301 static void
302 free_results (gpointer data)
304 struct RBThreadResultData *resultdata = data;
305 RBThread *thread = resultdata->thread;
306 guint i;
308 for (i = 0; i < resultdata->results->len; i++)
309 thread->priv->result_destroy_func (g_ptr_array_index (resultdata->results, i),
310 thread->priv->user_data);
312 g_ptr_array_free (resultdata->results, TRUE);
313 g_object_unref (thread);
314 g_free (resultdata);
317 static gboolean
318 process_results (gpointer data)
320 struct RBThreadResultData *resultdata = data;
321 RBThread *thread = resultdata->thread;
322 guint i;
324 for (i = 0; i < resultdata->results->len; i++)
325 thread->priv->result_func (g_ptr_array_index (resultdata->results, i), thread->priv->user_data);
326 return FALSE;
329 #define MAX_ACTIONS 10
331 /* This function as an idle handler in the mainloop of the action
332 * thread; it calls the action function on all queued action and
333 * gathers results, queueing the results for invocation by the
334 * result function in the context of the "target" mainloop
336 static gboolean
337 process_actions (gpointer data)
339 RBThread *thread = data;
340 gpointer action;
341 GPtrArray *results;
342 GSource *source;
343 struct RBThreadResultData *resultdata;
345 /* Invoke the action function on our queued actions, gathering results */
346 results = g_ptr_array_new ();
347 while (results->len < MAX_ACTIONS
348 && (action = g_async_queue_try_pop (thread->priv->action_queue))) {
349 gpointer result;
350 if (G_UNLIKELY (thread->priv->exit_flag))
351 break;
352 result = thread->priv->action_func (action, thread->priv->user_data, &(thread->priv->exit_flag));
353 thread->priv->action_destroy_func (action, thread->priv->user_data);
355 g_ptr_array_add (results, result);
357 /* Race condition here is irrelevant; see rb_thread_push_action */
358 g_atomic_int_dec_and_test (&(thread->priv->action_queue_processors));
360 resultdata = g_new0 (struct RBThreadResultData, 1);
361 resultdata->thread = g_object_ref (thread);
362 resultdata->results = results;
364 /* Now queue a callback in the target context */
365 source = g_idle_source_new ();
366 g_source_set_callback (source, process_results, resultdata, free_results);
367 g_source_attach (source, thread->priv->target_context);
368 g_source_unref (source);
370 return FALSE;
373 void
374 rb_thread_push_action (RBThread *thread,
375 gpointer action)
377 g_async_queue_push (thread->priv->action_queue, action);
378 /* Wake the thread up if necessary - note that it is not harmful if
379 * we queue this multiple times; hence the race between the test for
380 * zero and the inc is fine.
382 if (g_atomic_int_get (&thread->priv->action_queue_processors) == 0) {
383 GSource *source;
384 source = g_idle_source_new ();
386 g_source_set_callback (source, process_actions, thread, NULL);
387 g_source_attach (source, thread->priv->context);
388 g_source_unref (source);
389 g_atomic_int_inc (&(thread->priv->action_queue_processors));
393 static gboolean
394 mainloop_quit_cb (gpointer data)
396 RBThread *thread = data;
397 g_main_loop_quit (thread->priv->loop);
398 return FALSE;
401 void
402 rb_thread_terminate (RBThread *thread)
404 GSource *source;
406 /* Setting the exit flag stops processing in the idle function */
407 g_atomic_int_inc (&(thread->priv->exit_flag));
409 source = g_idle_source_new ();
410 g_source_set_callback (source, mainloop_quit_cb, thread, NULL);
411 g_source_attach (source, thread->priv->context);
412 g_source_unref (source);
414 g_thread_join (thread->priv->thread);
417 static gpointer
418 rb_thread_action_thread_main (gpointer data)
420 RBThread *thread = data;
422 g_mutex_lock (thread->priv->state_mutex);
423 thread->priv->thread_running = TRUE;
424 g_cond_broadcast (thread->priv->state_condition);
425 g_mutex_unlock (thread->priv->state_mutex);
427 thread->priv->context = g_main_context_new ();
428 thread->priv->loop = g_main_loop_new (thread->priv->context, TRUE);
430 rb_debug ("running");
432 g_main_loop_run (thread->priv->loop);
434 rb_debug ("exiting");
436 g_main_loop_unref (thread->priv->loop);
438 g_thread_exit (NULL);
439 return NULL;