1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*-
3 * Copyright (C) 2005 Colin Walters <walters@verbum.org>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #include <glib/gi18n.h>
27 #include "rb-thread.h"
30 static void rb_thread_class_init (RBThreadClass
*klass
);
31 static void rb_thread_init (RBThread
*view
);
32 static void rb_thread_finalize (GObject
*object
);
33 static void rb_thread_set_property (GObject
*object
,
37 static void rb_thread_get_property (GObject
*object
,
41 static GObject
* rb_thread_constructor (GType type
, guint n_construct_properties
,
42 GObjectConstructParam
*construct_properties
);
43 static gpointer
rb_thread_action_thread_main (gpointer data
);
45 struct RBThreadPrivate
47 GMainContext
*context
;
48 GMainContext
*target_context
;
50 GAsyncQueue
*action_queue
;
51 GAsyncQueue
*result_queue
;
53 gint action_queue_processors
;
55 gboolean thread_running
;
57 GCond
*state_condition
;
59 GSList
*queued_result_callbacks
;
61 gboolean action_processor_queued
;
63 RBThreadActionFunc action_func
;
64 RBThreadResultFunc result_func
;
65 RBThreadActionDestroyFunc action_destroy_func
;
66 RBThreadResultDestroyFunc result_destroy_func
;
74 #define RB_THREAD_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), RB_TYPE_THREAD, RBThreadPrivate))
87 G_DEFINE_TYPE(RBThread
, rb_thread
, G_TYPE_OBJECT
)
89 static GObjectClass
*parent_class
= NULL
;
92 rb_thread_class_init (RBThreadClass
*klass
)
94 GObjectClass
*object_class
= G_OBJECT_CLASS (klass
);
96 parent_class
= g_type_class_peek_parent (klass
);
98 object_class
->finalize
= rb_thread_finalize
;
99 object_class
->constructor
= rb_thread_constructor
;
101 object_class
->set_property
= rb_thread_set_property
;
102 object_class
->get_property
= rb_thread_get_property
;
104 g_object_class_install_property (object_class
,
106 g_param_spec_pointer ("context",
108 "Context in which to invoke callbacks",
109 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
110 g_object_class_install_property (object_class
,
112 g_param_spec_pointer ("action-func",
115 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
116 g_object_class_install_property (object_class
,
118 g_param_spec_pointer ("action-destroy",
120 "Action destroy function",
121 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
122 g_object_class_install_property (object_class
,
124 g_param_spec_pointer ("result-func",
127 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
128 g_object_class_install_property (object_class
,
130 g_param_spec_pointer ("result-destroy",
132 "Result destroy function",
133 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
134 g_object_class_install_property (object_class
,
136 g_param_spec_pointer ("data",
139 G_PARAM_READWRITE
| G_PARAM_CONSTRUCT_ONLY
));
141 g_type_class_add_private (klass
, sizeof (RBThreadPrivate
));
145 rb_thread_init (RBThread
*thread
)
147 thread
->priv
= RB_THREAD_GET_PRIVATE (thread
);
149 thread
->priv
->action_queue
= g_async_queue_new ();
150 thread
->priv
->result_queue
= g_async_queue_new ();
151 thread
->priv
->state_condition
= g_cond_new ();
152 thread
->priv
->action_queue_processors
= 0;
153 thread
->priv
->exit_flag
= 0;
157 rb_thread_finalize (GObject
*object
)
163 g_return_if_fail (object
!= NULL
);
164 g_return_if_fail (RB_IS_THREAD (object
));
166 thread
= RB_THREAD (object
);
168 g_return_if_fail (thread
->priv
!= NULL
);
170 while ((action
= g_async_queue_try_pop (thread
->priv
->action_queue
)) != NULL
)
171 thread
->priv
->action_destroy_func (action
, thread
->priv
->user_data
);
173 for (link
= thread
->priv
->queued_result_callbacks
; link
; link
= link
->next
) {
174 GSource
*source
= link
->data
;
175 g_source_destroy (source
);
177 g_slist_free (thread
->priv
->queued_result_callbacks
);
179 g_async_queue_unref (thread
->priv
->action_queue
);
180 g_async_queue_unref (thread
->priv
->result_queue
);
182 (* G_OBJECT_CLASS (parent_class
)->finalize
) (object
);
187 rb_thread_set_property (GObject
*object
,
192 RBThread
*thread
= RB_THREAD (object
);
197 thread
->priv
->context
= g_value_get_pointer (value
);
199 case PROP_ACTION_FUNC
:
200 thread
->priv
->action_func
= g_value_get_pointer (value
);
202 case PROP_RESULT_FUNC
:
203 thread
->priv
->action_func
= g_value_get_pointer (value
);
205 case PROP_ACTION_DESTROY
:
206 thread
->priv
->action_destroy_func
= g_value_get_pointer (value
);
208 case PROP_RESULT_DESTROY
:
209 thread
->priv
->result_destroy_func
= g_value_get_pointer (value
);
212 thread
->priv
->user_data
= g_value_get_pointer (value
);
215 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
221 rb_thread_get_property (GObject
*object
,
226 RBThread
*thread
= RB_THREAD (object
);
231 g_value_set_pointer (value
, thread
->priv
->context
);
233 case PROP_ACTION_FUNC
:
234 g_value_set_pointer (value
, thread
->priv
->action_func
);
236 case PROP_RESULT_FUNC
:
237 g_value_set_pointer (value
, thread
->priv
->result_func
);
239 case PROP_ACTION_DESTROY
:
240 g_value_set_pointer (value
, thread
->priv
->action_destroy_func
);
242 case PROP_RESULT_DESTROY
:
243 g_value_set_pointer (value
, thread
->priv
->result_destroy_func
);
246 g_value_set_pointer (value
, thread
->priv
->user_data
);
249 G_OBJECT_WARN_INVALID_PROPERTY_ID (object
, prop_id
, pspec
);
255 rb_thread_new (GMainContext
*context
,
256 RBThreadActionFunc action_cb
,
257 RBThreadResultFunc result_cb
,
258 RBThreadActionDestroyFunc action_destroy_func
,
259 RBThreadResultDestroyFunc result_destroy_func
,
262 return RB_THREAD (g_object_new (RB_TYPE_THREAD
,
264 "action-func", action_cb
,
265 "result-func", result_cb
,
266 "destroyfunc", action_destroy_func
,
267 "action-destroy", action_destroy_func
,
268 "result-destroy", result_destroy_func
,
269 "data", user_data
, NULL
));
273 rb_thread_constructor (GType type
, guint n_construct_properties
,
274 GObjectConstructParam
*construct_properties
)
277 RBThreadClass
*klass
;
278 GObjectClass
*parent_class
;
279 GError
*error
= NULL
;
281 klass
= RB_THREAD_CLASS (g_type_class_peek (RB_TYPE_THREAD
));
283 parent_class
= G_OBJECT_CLASS (g_type_class_peek_parent (klass
));
284 thread
= RB_THREAD (parent_class
->constructor (type
, n_construct_properties
,
285 construct_properties
));
286 thread
->priv
->thread
= g_thread_create (rb_thread_action_thread_main
, thread
, TRUE
, &error
);
287 if (!thread
->priv
->thread
)
288 g_error ("Couldn't create thread: %s", error
->message
);
290 /* Wait until the thread's mainloop is running */
291 g_mutex_lock (thread
->priv
->state_mutex
);
292 while (!thread
->priv
->thread_running
)
293 g_cond_wait (thread
->priv
->state_condition
, thread
->priv
->state_mutex
);
294 g_mutex_unlock (thread
->priv
->state_mutex
);
296 return G_OBJECT (thread
);
299 struct RBThreadResultData
306 free_results (gpointer data
)
308 struct RBThreadResultData
*resultdata
= data
;
309 RBThread
*thread
= resultdata
->thread
;
312 for (i
= 0; i
< resultdata
->results
->len
; i
++)
313 thread
->priv
->result_destroy_func (g_ptr_array_index (resultdata
->results
, i
),
314 thread
->priv
->user_data
);
316 g_ptr_array_free (resultdata
->results
, TRUE
);
317 g_object_unref (thread
);
322 process_results (gpointer data
)
324 struct RBThreadResultData
*resultdata
= data
;
325 RBThread
*thread
= resultdata
->thread
;
328 for (i
= 0; i
< resultdata
->results
->len
; i
++)
329 thread
->priv
->result_func (g_ptr_array_index (resultdata
->results
, i
), thread
->priv
->user_data
);
333 #define MAX_ACTIONS 10
335 /* This function as an idle handler in the mainloop of the action
336 * thread; it calls the action function on all queued action and
337 * gathers results, queueing the results for invocation by the
338 * result function in the context of the "target" mainloop
341 process_actions (gpointer data
)
343 RBThread
*thread
= data
;
349 struct RBThreadResultData
*resultdata
;
351 /* Invoke the action function on our queued actions, gathering results */
352 results
= g_ptr_array_new ();
353 while (results
->len
< MAX_ACTIONS
354 && (action
= g_async_queue_try_pop (thread
->priv
->action_queue
))) {
356 if (G_UNLIKELY (thread
->priv
->exit_flag
))
358 result
= thread
->priv
->action_func (action
, thread
->priv
->user_data
, &(thread
->priv
->exit_flag
));
359 thread
->priv
->action_destroy_func (action
, thread
->priv
->user_data
);
361 g_ptr_array_add (results
, result
);
363 /* Race condition here is irrelevant; see rb_thread_push_action */
364 g_atomic_int_dec_and_test (&(thread
->priv
->action_queue_processors
));
366 resultdata
= g_new0 (struct RBThreadResultData
, 1);
367 resultdata
->thread
= g_object_ref (thread
);
368 resultdata
->results
= results
;
370 /* Now queue a callback in the target context */
371 source
= g_idle_source_new ();
372 g_source_set_callback (source
, process_results
, resultdata
, free_results
);
373 g_source_attach (source
, thread
->priv
->target_context
);
374 g_source_unref (source
);
380 rb_thread_push_action (RBThread
*thread
,
383 g_async_queue_push (thread
->priv
->action_queue
, action
);
384 /* Wake the thread up if necessary - note that it is not harmful if
385 * we queue this multiple times; hence the race between the test for
386 * zero and the inc is fine.
388 if (g_atomic_int_get (&thread
->priv
->action_queue_processors
) == 0) {
390 source
= g_idle_source_new ();
392 g_source_set_callback (source
, process_actions
, thread
, NULL
);
393 g_source_attach (source
, thread
->priv
->context
);
394 g_source_unref (source
);
395 g_atomic_int_inc (&(thread
->priv
->action_queue_processors
));
400 mainloop_quit_cb (gpointer data
)
402 RBThread
*thread
= data
;
403 g_main_loop_quit (thread
->priv
->loop
);
408 rb_thread_terminate (RBThread
*thread
)
412 /* Setting the exit flag stops processing in the idle function */
413 g_atomic_int_inc (&(thread
->priv
->exit_flag
));
415 source
= g_idle_source_new ();
416 g_source_set_callback (source
, mainloop_quit_cb
, thread
, NULL
);
417 g_source_attach (source
, thread
->priv
->context
);
418 g_source_unref (source
);
420 g_thread_join (thread
->priv
->thread
);
424 rb_thread_action_thread_main (gpointer data
)
426 RBThread
*thread
= data
;
428 g_mutex_lock (thread
->priv
->state_mutex
);
429 thread
->priv
->thread_running
= TRUE
;
430 g_cond_broadcast (thread
->priv
->state_condition
);
431 g_mutex_unlock (thread
->priv
->state_mutex
);
433 thread
->priv
->context
= g_main_context_new ();
434 thread
->priv
->loop
= g_main_loop_new (thread
->priv
->context
, TRUE
);
436 rb_debug ("running");
438 g_main_loop_run (thread
->priv
->loop
);
440 rb_debug ("exiting");
442 g_main_loop_unref (thread
->priv
->loop
);
444 g_thread_exit (NULL
);