1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*-
3 * Copyright (C) 2005 Colin Walters <walters@verbum.org>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include <glib/gi18n.h>
27 #include "rb-thread.h"
30 static void rb_thread_class_init (RBThreadClass
*klass
);
31 static void rb_thread_init (RBThread
*view
);
32 static void rb_thread_finalize (GObject
*object
);
33 static void rb_thread_set_property (GObject
*object
,
37 static void rb_thread_get_property (GObject
*object
,
41 static GObject
* rb_thread_constructor (GType type
, guint n_construct_properties
,
42 GObjectConstructParam
*construct_properties
);
43 static gpointer
rb_thread_action_thread_main (gpointer data
);
45 struct RBThreadPrivate
47 GMainContext
*context
;
48 GMainContext
*target_context
;
50 GAsyncQueue
*action_queue
;
51 GAsyncQueue
*result_queue
;
53 gint action_queue_processors
;
55 gboolean thread_running
;
57 GCond
*state_condition
;
59 GSList
*queued_result_callbacks
;
61 gboolean action_processor_queued
;
63 RBThreadActionFunc action_func
;
64 RBThreadResultFunc result_func
;
65 RBThreadActionDestroyFunc action_destroy_func
;
66 RBThreadResultDestroyFunc result_destroy_func
;
74 #define RB_THREAD_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), RB_TYPE_THREAD, RBThreadPrivate))
87 G_DEFINE_TYPE(RBThread
, rb_thread
, G_TYPE_OBJECT
)
90 rb_thread_class_init (RBThreadClass
*klass
)
92 GObjectClass
*object_class
= G_OBJECT_CLASS (klass
);
94 object_class
->finalize
= rb_thread_finalize
;
95 object_class
->constructor
= rb_thread_constructor
;
97 object_class
->set_property
= rb_thread_set_property
;
98 object_class
->get_property
= rb_thread_get_property
;
100 g_object_class_install_property (object_class
,
102 g_param_spec_pointer ("context",
104 "Context in which to invoke callbacks",
105 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
106 g_object_class_install_property (object_class
,
108 g_param_spec_pointer ("action-func",
111 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
112 g_object_class_install_property (object_class
,
114 g_param_spec_pointer ("action-destroy",
116 "Action destroy function",
117 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
118 g_object_class_install_property (object_class
,
120 g_param_spec_pointer ("result-func",
123 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
124 g_object_class_install_property (object_class
,
126 g_param_spec_pointer ("result-destroy",
128 "Result destroy function",
129 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
130 g_object_class_install_property (object_class
,
132 g_param_spec_pointer ("data",
135 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
137 g_type_class_add_private (klass
, sizeof (RBThreadPrivate
));
141 rb_thread_init (RBThread
*thread
)
143 thread
->priv
= RB_THREAD_GET_PRIVATE (thread
);
145 thread
->priv
->action_queue
= g_async_queue_new ();
146 thread
->priv
->result_queue
= g_async_queue_new ();
147 thread
->priv
->state_condition
= g_cond_new ();
148 thread
->priv
->action_queue_processors
= 0;
149 thread
->priv
->exit_flag
= 0;
153 rb_thread_finalize (GObject
*object
)
159 g_return_if_fail (object
!= NULL
);
160 g_return_if_fail (RB_IS_THREAD (object
));
162 thread
= RB_THREAD (object
);
164 g_return_if_fail (thread
->priv
!= NULL
);
166 while ((action
= g_async_queue_try_pop (thread
->priv
->action_queue
)) != NULL
)
167 thread
->priv
->action_destroy_func (action
, thread
->priv
->user_data
);
169 for (link
= thread
->priv
->queued_result_callbacks
; link
; link
= link
->next
) {
170 GSource
*source
= link
->data
;
171 g_source_destroy (source
);
173 g_slist_free (thread
->priv
->queued_result_callbacks
);
175 g_async_queue_unref (thread
->priv
->action_queue
);
176 g_async_queue_unref (thread
->priv
->result_queue
);
178 G_OBJECT_CLASS (rb_thread_parent_class
)->finalize (object
);
183 rb_thread_set_property (GObject
*object
,
188 RBThread
*thread
= RB_THREAD (object
);
193 thread
->priv
->context
= g_value_get_pointer (value
);
195 case PROP_ACTION_FUNC
:
196 thread
->priv
->action_func
= g_value_get_pointer (value
);
198 case PROP_RESULT_FUNC
:
199 thread
->priv
->action_func
= g_value_get_pointer (value
);
201 case PROP_ACTION_DESTROY
:
202 thread
->priv
->action_destroy_func
= g_value_get_pointer (value
);
204 case PROP_RESULT_DESTROY
:
205 thread
->priv
->result_destroy_func
= g_value_get_pointer (value
);
208 thread
->priv
->user_data
= g_value_get_pointer (value
);
211 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
217 rb_thread_get_property (GObject
*object
,
222 RBThread
*thread
= RB_THREAD (object
);
227 g_value_set_pointer (value
, thread
->priv
->context
);
229 case PROP_ACTION_FUNC
:
230 g_value_set_pointer (value
, thread
->priv
->action_func
);
232 case PROP_RESULT_FUNC
:
233 g_value_set_pointer (value
, thread
->priv
->result_func
);
235 case PROP_ACTION_DESTROY
:
236 g_value_set_pointer (value
, thread
->priv
->action_destroy_func
);
238 case PROP_RESULT_DESTROY
:
239 g_value_set_pointer (value
, thread
->priv
->result_destroy_func
);
242 g_value_set_pointer (value
, thread
->priv
->user_data
);
245 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
251 rb_thread_new (GMainContext
*context
,
252 RBThreadActionFunc action_cb
,
253 RBThreadResultFunc result_cb
,
254 RBThreadActionDestroyFunc action_destroy_func
,
255 RBThreadResultDestroyFunc result_destroy_func
,
258 return RB_THREAD (g_object_new (RB_TYPE_THREAD
,
260 "action-func", action_cb
,
261 "result-func", result_cb
,
262 "destroyfunc", action_destroy_func
,
263 "action-destroy", action_destroy_func
,
264 "result-destroy", result_destroy_func
,
265 "data", user_data
, NULL
));
269 rb_thread_constructor (GType type
, guint n_construct_properties
,
270 GObjectConstructParam
*construct_properties
)
273 RBThreadClass
*klass
;
274 GObjectClass
*parent_class
;
275 GError
*error
= NULL
;
277 klass
= RB_THREAD_CLASS (g_type_class_peek (RB_TYPE_THREAD
));
279 parent_class
= G_OBJECT_CLASS (g_type_class_peek_parent (klass
));
280 thread
= RB_THREAD (parent_class
->constructor (type
, n_construct_properties
,
281 construct_properties
));
282 thread
->priv
->thread
= g_thread_create (rb_thread_action_thread_main
, thread
, TRUE
, &error
);
283 if (!thread
->priv
->thread
)
284 g_error ("Couldn't create thread: %s", error
->message
);
286 /* Wait until the thread's mainloop is running */
287 g_mutex_lock (thread
->priv
->state_mutex
);
288 while (!thread
->priv
->thread_running
)
289 g_cond_wait (thread
->priv
->state_condition
, thread
->priv
->state_mutex
);
290 g_mutex_unlock (thread
->priv
->state_mutex
);
292 return G_OBJECT (thread
);
295 struct RBThreadResultData
302 free_results (gpointer data
)
304 struct RBThreadResultData
*resultdata
= data
;
305 RBThread
*thread
= resultdata
->thread
;
308 for (i
= 0; i
< resultdata
->results
->len
; i
++)
309 thread
->priv
->result_destroy_func (g_ptr_array_index (resultdata
->results
, i
),
310 thread
->priv
->user_data
);
312 g_ptr_array_free (resultdata
->results
, TRUE
);
313 g_object_unref (thread
);
318 process_results (gpointer data
)
320 struct RBThreadResultData
*resultdata
= data
;
321 RBThread
*thread
= resultdata
->thread
;
324 for (i
= 0; i
< resultdata
->results
->len
; i
++)
325 thread
->priv
->result_func (g_ptr_array_index (resultdata
->results
, i
), thread
->priv
->user_data
);
329 #define MAX_ACTIONS 10
331 /* This function as an idle handler in the mainloop of the action
332 * thread; it calls the action function on all queued action and
333 * gathers results, queueing the results for invocation by the
334 * result function in the context of the "target" mainloop
337 process_actions (gpointer data
)
339 RBThread
*thread
= data
;
343 struct RBThreadResultData
*resultdata
;
345 /* Invoke the action function on our queued actions, gathering results */
346 results
= g_ptr_array_new ();
347 while (results
->len
< MAX_ACTIONS
348 && (action
= g_async_queue_try_pop (thread
->priv
->action_queue
))) {
350 if (G_UNLIKELY (thread
->priv
->exit_flag
))
352 result
= thread
->priv
->action_func (action
, thread
->priv
->user_data
, &(thread
->priv
->exit_flag
));
353 thread
->priv
->action_destroy_func (action
, thread
->priv
->user_data
);
355 g_ptr_array_add (results
, result
);
357 /* Race condition here is irrelevant; see rb_thread_push_action */
358 g_atomic_int_dec_and_test (&(thread
->priv
->action_queue_processors
));
360 resultdata
= g_new0 (struct RBThreadResultData
, 1);
361 resultdata
->thread
= g_object_ref (thread
);
362 resultdata
->results
= results
;
364 /* Now queue a callback in the target context */
365 source
= g_idle_source_new ();
366 g_source_set_callback (source
, process_results
, resultdata
, free_results
);
367 g_source_attach (source
, thread
->priv
->target_context
);
368 g_source_unref (source
);
374 rb_thread_push_action (RBThread
*thread
,
377 g_async_queue_push (thread
->priv
->action_queue
, action
);
378 /* Wake the thread up if necessary - note that it is not harmful if
379 * we queue this multiple times; hence the race between the test for
380 * zero and the inc is fine.
382 if (g_atomic_int_get (&thread
->priv
->action_queue_processors
) == 0) {
384 source
= g_idle_source_new ();
386 g_source_set_callback (source
, process_actions
, thread
, NULL
);
387 g_source_attach (source
, thread
->priv
->context
);
388 g_source_unref (source
);
389 g_atomic_int_inc (&(thread
->priv
->action_queue_processors
));
394 mainloop_quit_cb (gpointer data
)
396 RBThread
*thread
= data
;
397 g_main_loop_quit (thread
->priv
->loop
);
402 rb_thread_terminate (RBThread
*thread
)
406 /* Setting the exit flag stops processing in the idle function */
407 g_atomic_int_inc (&(thread
->priv
->exit_flag
));
409 source
= g_idle_source_new ();
410 g_source_set_callback (source
, mainloop_quit_cb
, thread
, NULL
);
411 g_source_attach (source
, thread
->priv
->context
);
412 g_source_unref (source
);
414 g_thread_join (thread
->priv
->thread
);
418 rb_thread_action_thread_main (gpointer data
)
420 RBThread
*thread
= data
;
422 g_mutex_lock (thread
->priv
->state_mutex
);
423 thread
->priv
->thread_running
= TRUE
;
424 g_cond_broadcast (thread
->priv
->state_condition
);
425 g_mutex_unlock (thread
->priv
->state_mutex
);
427 thread
->priv
->context
= g_main_context_new ();
428 thread
->priv
->loop
= g_main_loop_new (thread
->priv
->context
, TRUE
);
430 rb_debug ("running");
432 g_main_loop_run (thread
->priv
->loop
);
434 rb_debug ("exiting");
436 g_main_loop_unref (thread
->priv
->loop
);
438 g_thread_exit (NULL
);