1 /* GDBus - GLib D-Bus Library
3 * Copyright (C) 2008-2010 Red Hat, Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General
16 * Public License along with this library; if not, write to the
17 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
18 * Boston, MA 02111-1307, USA.
20 * Author: David Zeuthen <davidz@redhat.com>
33 #include "gdbusprivate.h"
34 #include "gdbusmessage.h"
35 #include "gdbuserror.h"
36 #include "gdbusintrospection.h"
37 #include "gasyncresult.h"
38 #include "gsimpleasyncresult.h"
39 #include "ginputstream.h"
40 #include "gmemoryinputstream.h"
41 #include "giostream.h"
42 #include "gsocketcontrolmessage.h"
43 #include "gsocketconnection.h"
44 #include "gsocketoutputstream.h"
47 #include "gunixfdmessage.h"
48 #include "gunixconnection.h"
49 #include "gunixcredentialsmessage.h"
58 static gboolean
_g_dbus_worker_do_initial_read (gpointer data
);
60 /* ---------------------------------------------------------------------------------------------------- */
63 _g_dbus_hexdump (const gchar
*data
, gsize len
, guint indent
)
68 ret
= g_string_new (NULL
);
70 for (n
= 0; n
< len
; n
+= 16)
72 g_string_append_printf (ret
, "%*s%04x: ", indent
, "", n
);
74 for (m
= n
; m
< n
+ 16; m
++)
76 if (m
> n
&& (m
%4) == 0)
77 g_string_append_c (ret
, ' ');
79 g_string_append_printf (ret
, "%02x ", (guchar
) data
[m
]);
81 g_string_append (ret
, " ");
84 g_string_append (ret
, " ");
86 for (m
= n
; m
< len
&& m
< n
+ 16; m
++)
87 g_string_append_c (ret
, g_ascii_isprint (data
[m
]) ? data
[m
] : '.');
89 g_string_append_c (ret
, '\n');
92 return g_string_free (ret
, FALSE
);
95 /* ---------------------------------------------------------------------------------------------------- */
97 /* Unfortunately ancillary messages are discarded when reading from a
98 * socket using the GSocketInputStream abstraction. So we provide a
99 * very GInputStream-ish API that uses GSocket in this case (very
100 * similar to GSocketInputStream).
106 GCancellable
*cancellable
;
111 GSocketControlMessage
***messages
;
114 GSimpleAsyncResult
*simple
;
116 gboolean from_mainloop
;
117 } ReadWithControlData
;
120 read_with_control_data_free (ReadWithControlData
*data
)
122 g_object_unref (data
->socket
);
123 if (data
->cancellable
!= NULL
)
124 g_object_unref (data
->cancellable
);
125 g_object_unref (data
->simple
);
130 _g_socket_read_with_control_messages_ready (GSocket
*socket
,
131 GIOCondition condition
,
134 ReadWithControlData
*data
= user_data
;
140 vector
.buffer
= data
->buffer
;
141 vector
.size
= data
->count
;
142 result
= g_socket_receive_message (data
->socket
,
153 g_simple_async_result_set_op_res_gssize (data
->simple
, result
);
157 g_assert (error
!= NULL
);
158 g_simple_async_result_take_error (data
->simple
, error
);
161 if (data
->from_mainloop
)
162 g_simple_async_result_complete (data
->simple
);
164 g_simple_async_result_complete_in_idle (data
->simple
);
170 _g_socket_read_with_control_messages (GSocket
*socket
,
173 GSocketControlMessage
***messages
,
176 GCancellable
*cancellable
,
177 GAsyncReadyCallback callback
,
180 ReadWithControlData
*data
;
182 data
= g_new0 (ReadWithControlData
, 1);
183 data
->socket
= g_object_ref (socket
);
184 data
->cancellable
= cancellable
!= NULL
? g_object_ref (cancellable
) : NULL
;
185 data
->buffer
= buffer
;
187 data
->messages
= messages
;
188 data
->num_messages
= num_messages
;
190 data
->simple
= g_simple_async_result_new (G_OBJECT (socket
),
193 _g_socket_read_with_control_messages
);
195 if (!g_socket_condition_check (socket
, G_IO_IN
))
198 data
->from_mainloop
= TRUE
;
199 source
= g_socket_create_source (data
->socket
,
200 G_IO_IN
| G_IO_HUP
| G_IO_ERR
,
202 g_source_set_callback (source
,
203 (GSourceFunc
) _g_socket_read_with_control_messages_ready
,
205 (GDestroyNotify
) read_with_control_data_free
);
206 g_source_attach (source
, g_main_context_get_thread_default ());
207 g_source_unref (source
);
211 _g_socket_read_with_control_messages_ready (data
->socket
, G_IO_IN
, data
);
212 read_with_control_data_free (data
);
217 _g_socket_read_with_control_messages_finish (GSocket
*socket
,
218 GAsyncResult
*result
,
221 GSimpleAsyncResult
*simple
= G_SIMPLE_ASYNC_RESULT (result
);
223 g_return_val_if_fail (G_IS_SOCKET (socket
), -1);
224 g_warn_if_fail (g_simple_async_result_get_source_tag (simple
) == _g_socket_read_with_control_messages
);
226 if (g_simple_async_result_propagate_error (simple
, error
))
229 return g_simple_async_result_get_op_res_gssize (simple
);
232 /* ---------------------------------------------------------------------------------------------------- */
234 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
236 static GPtrArray
*ensured_classes
= NULL
;
239 ensure_type (GType gtype
)
241 g_ptr_array_add (ensured_classes
, g_type_class_ref (gtype
));
245 release_required_types (void)
247 g_ptr_array_foreach (ensured_classes
, (GFunc
) g_type_class_unref
, NULL
);
248 g_ptr_array_unref (ensured_classes
);
249 ensured_classes
= NULL
;
253 ensure_required_types (void)
255 g_assert (ensured_classes
== NULL
);
256 ensured_classes
= g_ptr_array_new ();
257 ensure_type (G_TYPE_SIMPLE_ASYNC_RESULT
);
258 ensure_type (G_TYPE_MEMORY_INPUT_STREAM
);
260 /* ---------------------------------------------------------------------------------------------------- */
264 volatile gint refcount
;
266 GMainContext
*context
;
271 gdbus_shared_thread_func (gpointer user_data
)
273 SharedThreadData
*data
= user_data
;
275 g_main_context_push_thread_default (data
->context
);
276 g_main_loop_run (data
->loop
);
277 g_main_context_pop_thread_default (data
->context
);
279 release_required_types ();
284 /* ---------------------------------------------------------------------------------------------------- */
286 static SharedThreadData
*
287 _g_dbus_shared_thread_ref (void)
289 static gsize shared_thread_data
= 0;
290 SharedThreadData
*ret
;
292 if (g_once_init_enter (&shared_thread_data
))
294 SharedThreadData
*data
;
296 /* Work-around for https://bugzilla.gnome.org/show_bug.cgi?id=627724 */
297 ensure_required_types ();
299 data
= g_new0 (SharedThreadData
, 1);
302 data
->context
= g_main_context_new ();
303 data
->loop
= g_main_loop_new (data
->context
, FALSE
);
304 data
->thread
= g_thread_new ("gdbus",
305 gdbus_shared_thread_func
,
307 /* We can cast between gsize and gpointer safely */
308 g_once_init_leave (&shared_thread_data
, (gsize
) data
);
311 ret
= (SharedThreadData
*) shared_thread_data
;
312 g_atomic_int_inc (&ret
->refcount
);
317 _g_dbus_shared_thread_unref (SharedThreadData
*data
)
319 /* TODO: actually destroy the shared thread here */
321 g_assert (data
!= NULL
);
322 if (g_atomic_int_dec_and_test (&data
->refcount
))
324 g_main_loop_quit (data
->loop
);
325 //g_thread_join (data->thread);
326 g_main_loop_unref (data
->loop
);
327 g_main_context_unref (data
->context
);
332 /* ---------------------------------------------------------------------------------------------------- */
336 volatile gint ref_count
;
338 SharedThreadData
*shared_thread_data
;
340 /* really a boolean, but GLib 2.28 lacks atomic boolean ops */
341 volatile gint stopped
;
343 /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
344 * only affects messages received from the other peer (since GDBusServer is the
345 * only user) - we might want it to affect messages sent to the other peer too?
348 GDBusCapabilityFlags capabilities
;
349 GQueue
*received_messages_while_frozen
;
352 GCancellable
*cancellable
;
353 GDBusWorkerMessageReceivedCallback message_received_callback
;
354 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback
;
355 GDBusWorkerDisconnectedCallback disconnected_callback
;
358 /* if not NULL, stream is GSocketConnection */
361 /* used for reading */
364 gsize read_buffer_allocated_size
;
365 gsize read_buffer_cur_size
;
366 gsize read_buffer_bytes_wanted
;
367 GUnixFDList
*read_fd_list
;
368 GSocketControlMessage
**read_ancillary_messages
;
369 gint read_num_ancillary_messages
;
371 /* TRUE if an async write, flush or close is pending.
372 * Only the worker thread may change its value, and only with the write_lock.
373 * Other threads may read its value when holding the write_lock.
374 * The worker thread may read its value at any time.
376 gboolean output_pending
;
377 /* used for writing */
379 /* queue of MessageToWriteData, protected by write_lock */
381 /* protected by write_lock */
382 guint64 write_num_messages_written
;
383 /* list of FlushData, protected by write_lock */
384 GList
*write_pending_flushes
;
385 /* list of CloseData, protected by write_lock */
386 GList
*pending_close_attempts
;
387 /* no lock - only used from the worker thread */
388 gboolean close_expected
;
391 static void _g_dbus_worker_unref (GDBusWorker
*worker
);
393 /* ---------------------------------------------------------------------------------------------------- */
399 guint64 number_to_wait_for
;
403 struct _MessageToWriteData
;
404 typedef struct _MessageToWriteData MessageToWriteData
;
406 static void message_to_write_data_free (MessageToWriteData
*data
);
408 static void read_message_print_transport_debug (gssize bytes_read
,
409 GDBusWorker
*worker
);
411 static void write_message_print_transport_debug (gssize bytes_written
,
412 MessageToWriteData
*data
);
416 GCancellable
*cancellable
;
417 GSimpleAsyncResult
*result
;
420 static void close_data_free (CloseData
*close_data
)
422 if (close_data
->cancellable
!= NULL
)
423 g_object_unref (close_data
->cancellable
);
425 if (close_data
->result
!= NULL
)
426 g_object_unref (close_data
->result
);
428 _g_dbus_worker_unref (close_data
->worker
);
429 g_slice_free (CloseData
, close_data
);
432 /* ---------------------------------------------------------------------------------------------------- */
435 _g_dbus_worker_ref (GDBusWorker
*worker
)
437 g_atomic_int_inc (&worker
->ref_count
);
442 _g_dbus_worker_unref (GDBusWorker
*worker
)
444 if (g_atomic_int_dec_and_test (&worker
->ref_count
))
446 g_assert (worker
->write_pending_flushes
== NULL
);
448 _g_dbus_shared_thread_unref (worker
->shared_thread_data
);
450 g_object_unref (worker
->stream
);
452 g_mutex_clear (&worker
->read_lock
);
453 g_object_unref (worker
->cancellable
);
454 if (worker
->read_fd_list
!= NULL
)
455 g_object_unref (worker
->read_fd_list
);
457 g_queue_foreach (worker
->received_messages_while_frozen
, (GFunc
) g_object_unref
, NULL
);
458 g_queue_free (worker
->received_messages_while_frozen
);
460 g_mutex_clear (&worker
->write_lock
);
461 g_queue_foreach (worker
->write_queue
, (GFunc
) message_to_write_data_free
, NULL
);
462 g_queue_free (worker
->write_queue
);
464 g_free (worker
->read_buffer
);
471 _g_dbus_worker_emit_disconnected (GDBusWorker
*worker
,
472 gboolean remote_peer_vanished
,
475 if (!g_atomic_int_get (&worker
->stopped
))
476 worker
->disconnected_callback (worker
, remote_peer_vanished
, error
, worker
->user_data
);
480 _g_dbus_worker_emit_message_received (GDBusWorker
*worker
,
481 GDBusMessage
*message
)
483 if (!g_atomic_int_get (&worker
->stopped
))
484 worker
->message_received_callback (worker
, message
, worker
->user_data
);
487 static GDBusMessage
*
488 _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker
*worker
,
489 GDBusMessage
*message
)
492 if (!g_atomic_int_get (&worker
->stopped
))
493 ret
= worker
->message_about_to_be_sent_callback (worker
, message
, worker
->user_data
);
499 /* can only be called from private thread with read-lock held - takes ownership of @message */
501 _g_dbus_worker_queue_or_deliver_received_message (GDBusWorker
*worker
,
502 GDBusMessage
*message
)
504 if (worker
->frozen
|| g_queue_get_length (worker
->received_messages_while_frozen
) > 0)
507 g_queue_push_tail (worker
->received_messages_while_frozen
, message
);
511 /* not frozen, nor anything in queue */
512 _g_dbus_worker_emit_message_received (worker
, message
);
513 g_object_unref (message
);
517 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
519 unfreeze_in_idle_cb (gpointer user_data
)
521 GDBusWorker
*worker
= user_data
;
522 GDBusMessage
*message
;
524 g_mutex_lock (&worker
->read_lock
);
527 while ((message
= g_queue_pop_head (worker
->received_messages_while_frozen
)) != NULL
)
529 _g_dbus_worker_emit_message_received (worker
, message
);
530 g_object_unref (message
);
532 worker
->frozen
= FALSE
;
536 g_assert (g_queue_get_length (worker
->received_messages_while_frozen
) == 0);
538 g_mutex_unlock (&worker
->read_lock
);
542 /* can be called from any thread */
544 _g_dbus_worker_unfreeze (GDBusWorker
*worker
)
546 GSource
*idle_source
;
547 idle_source
= g_idle_source_new ();
548 g_source_set_priority (idle_source
, G_PRIORITY_DEFAULT
);
549 g_source_set_callback (idle_source
,
551 _g_dbus_worker_ref (worker
),
552 (GDestroyNotify
) _g_dbus_worker_unref
);
553 g_source_attach (idle_source
, worker
->shared_thread_data
->context
);
554 g_source_unref (idle_source
);
557 /* ---------------------------------------------------------------------------------------------------- */
559 static void _g_dbus_worker_do_read_unlocked (GDBusWorker
*worker
);
561 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
563 _g_dbus_worker_do_read_cb (GInputStream
*input_stream
,
567 GDBusWorker
*worker
= user_data
;
571 g_mutex_lock (&worker
->read_lock
);
573 /* If already stopped, don't even process the reply */
574 if (g_atomic_int_get (&worker
->stopped
))
578 if (worker
->socket
== NULL
)
579 bytes_read
= g_input_stream_read_finish (g_io_stream_get_input_stream (worker
->stream
),
583 bytes_read
= _g_socket_read_with_control_messages_finish (worker
->socket
,
586 if (worker
->read_num_ancillary_messages
> 0)
589 for (n
= 0; n
< worker
->read_num_ancillary_messages
; n
++)
591 GSocketControlMessage
*control_message
= G_SOCKET_CONTROL_MESSAGE (worker
->read_ancillary_messages
[n
]);
597 else if (G_IS_UNIX_FD_MESSAGE (control_message
))
599 GUnixFDMessage
*fd_message
;
603 fd_message
= G_UNIX_FD_MESSAGE (control_message
);
604 fds
= g_unix_fd_message_steal_fds (fd_message
, &num_fds
);
605 if (worker
->read_fd_list
== NULL
)
607 worker
->read_fd_list
= g_unix_fd_list_new_from_array (fds
, num_fds
);
612 for (n
= 0; n
< num_fds
; n
++)
614 /* TODO: really want a append_steal() */
615 g_unix_fd_list_append (worker
->read_fd_list
, fds
[n
], NULL
);
621 else if (G_IS_UNIX_CREDENTIALS_MESSAGE (control_message
))
633 "Unexpected ancillary message of type %s received from peer",
634 g_type_name (G_TYPE_FROM_INSTANCE (control_message
)));
635 _g_dbus_worker_emit_disconnected (worker
, TRUE
, error
);
636 g_error_free (error
);
637 g_object_unref (control_message
);
639 while (n
< worker
->read_num_ancillary_messages
)
640 g_object_unref (worker
->read_ancillary_messages
[n
++]);
641 g_free (worker
->read_ancillary_messages
);
645 g_object_unref (control_message
);
647 g_free (worker
->read_ancillary_messages
);
650 if (bytes_read
== -1)
652 if (G_UNLIKELY (_g_dbus_debug_transport ()))
654 _g_dbus_debug_print_lock ();
655 g_print ("========================================================================\n"
656 "GDBus-debug:Transport:\n"
657 " ---- READ ERROR on stream of type %s:\n"
659 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker
->stream
))),
660 g_quark_to_string (error
->domain
), error
->code
,
662 _g_dbus_debug_print_unlock ();
665 /* Every async read that uses this callback uses worker->cancellable
666 * as its GCancellable. worker->cancellable gets cancelled if and only
667 * if the GDBusConnection tells us to close (either via
668 * _g_dbus_worker_stop, which is called on last-unref, or directly),
669 * so a cancelled read must mean our connection was closed locally.
671 * If we're closing, other errors are possible - notably,
672 * G_IO_ERROR_CLOSED can be seen if we close the stream with an async
673 * read in-flight. It seems sensible to treat all read errors during
674 * closing as an expected thing that doesn't trip exit-on-close.
676 * Because close_expected can't be set until we get into the worker
677 * thread, but the cancellable is signalled sooner (from another
678 * thread), we do still need to check the error.
680 if (worker
->close_expected
||
681 g_error_matches (error
, G_IO_ERROR
, G_IO_ERROR_CANCELLED
))
682 _g_dbus_worker_emit_disconnected (worker
, FALSE
, NULL
);
684 _g_dbus_worker_emit_disconnected (worker
, TRUE
, error
);
686 g_error_free (error
);
691 g_debug ("read %d bytes (is_closed=%d blocking=%d condition=0x%02x) stream %p, %p",
693 g_socket_is_closed (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
))),
694 g_socket_get_blocking (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
))),
695 g_socket_condition_check (g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
)),
696 G_IO_IN
| G_IO_OUT
| G_IO_HUP
),
701 /* TODO: hmm, hmm... */
707 "Underlying GIOStream returned 0 bytes on an async read");
708 _g_dbus_worker_emit_disconnected (worker
, TRUE
, error
);
709 g_error_free (error
);
713 read_message_print_transport_debug (bytes_read
, worker
);
715 worker
->read_buffer_cur_size
+= bytes_read
;
716 if (worker
->read_buffer_bytes_wanted
== worker
->read_buffer_cur_size
)
718 /* OK, got what we asked for! */
719 if (worker
->read_buffer_bytes_wanted
== 16)
722 /* OK, got the header - determine how many more bytes are needed */
724 message_len
= g_dbus_message_bytes_needed ((guchar
*) worker
->read_buffer
,
727 if (message_len
== -1)
729 g_warning ("_g_dbus_worker_do_read_cb: error determing bytes needed: %s", error
->message
);
730 _g_dbus_worker_emit_disconnected (worker
, FALSE
, error
);
731 g_error_free (error
);
735 worker
->read_buffer_bytes_wanted
= message_len
;
736 _g_dbus_worker_do_read_unlocked (worker
);
740 GDBusMessage
*message
;
743 /* TODO: use connection->priv->auth to decode the message */
745 message
= g_dbus_message_new_from_blob ((guchar
*) worker
->read_buffer
,
746 worker
->read_buffer_cur_size
,
747 worker
->capabilities
,
752 s
= _g_dbus_hexdump (worker
->read_buffer
, worker
->read_buffer_cur_size
, 2);
753 g_warning ("Error decoding D-Bus message of %" G_GSIZE_FORMAT
" bytes\n"
755 "The payload is as follows:\n"
757 worker
->read_buffer_cur_size
,
761 _g_dbus_worker_emit_disconnected (worker
, FALSE
, error
);
762 g_error_free (error
);
767 if (worker
->read_fd_list
!= NULL
)
769 g_dbus_message_set_unix_fd_list (message
, worker
->read_fd_list
);
770 g_object_unref (worker
->read_fd_list
);
771 worker
->read_fd_list
= NULL
;
775 if (G_UNLIKELY (_g_dbus_debug_message ()))
778 _g_dbus_debug_print_lock ();
779 g_print ("========================================================================\n"
780 "GDBus-debug:Message:\n"
781 " <<<< RECEIVED D-Bus message (%" G_GSIZE_FORMAT
" bytes)\n",
782 worker
->read_buffer_cur_size
);
783 s
= g_dbus_message_print (message
, 2);
786 if (G_UNLIKELY (_g_dbus_debug_payload ()))
788 s
= _g_dbus_hexdump (worker
->read_buffer
, worker
->read_buffer_cur_size
, 2);
792 _g_dbus_debug_print_unlock ();
795 /* yay, got a message, go deliver it */
796 _g_dbus_worker_queue_or_deliver_received_message (worker
, message
);
798 /* start reading another message! */
799 worker
->read_buffer_bytes_wanted
= 0;
800 worker
->read_buffer_cur_size
= 0;
801 _g_dbus_worker_do_read_unlocked (worker
);
806 /* didn't get all the bytes we requested - so repeat the request... */
807 _g_dbus_worker_do_read_unlocked (worker
);
811 g_mutex_unlock (&worker
->read_lock
);
813 /* gives up the reference acquired when calling g_input_stream_read_async() */
814 _g_dbus_worker_unref (worker
);
817 /* called in private thread shared by all GDBusConnection instances (with read-lock held) */
819 _g_dbus_worker_do_read_unlocked (GDBusWorker
*worker
)
821 /* Note that we do need to keep trying to read even if close_expected is
822 * true, because only failing a read causes us to signal 'closed'.
825 /* if bytes_wanted is zero, it means start reading a message */
826 if (worker
->read_buffer_bytes_wanted
== 0)
828 worker
->read_buffer_cur_size
= 0;
829 worker
->read_buffer_bytes_wanted
= 16;
832 /* ensure we have a (big enough) buffer */
833 if (worker
->read_buffer
== NULL
|| worker
->read_buffer_bytes_wanted
> worker
->read_buffer_allocated_size
)
835 /* TODO: 4096 is randomly chosen; might want a better chosen default minimum */
836 worker
->read_buffer_allocated_size
= MAX (worker
->read_buffer_bytes_wanted
, 4096);
837 worker
->read_buffer
= g_realloc (worker
->read_buffer
, worker
->read_buffer_allocated_size
);
840 if (worker
->socket
== NULL
)
841 g_input_stream_read_async (g_io_stream_get_input_stream (worker
->stream
),
842 worker
->read_buffer
+ worker
->read_buffer_cur_size
,
843 worker
->read_buffer_bytes_wanted
- worker
->read_buffer_cur_size
,
846 (GAsyncReadyCallback
) _g_dbus_worker_do_read_cb
,
847 _g_dbus_worker_ref (worker
));
850 worker
->read_ancillary_messages
= NULL
;
851 worker
->read_num_ancillary_messages
= 0;
852 _g_socket_read_with_control_messages (worker
->socket
,
853 worker
->read_buffer
+ worker
->read_buffer_cur_size
,
854 worker
->read_buffer_bytes_wanted
- worker
->read_buffer_cur_size
,
855 &worker
->read_ancillary_messages
,
856 &worker
->read_num_ancillary_messages
,
859 (GAsyncReadyCallback
) _g_dbus_worker_do_read_cb
,
860 _g_dbus_worker_ref (worker
));
864 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
866 _g_dbus_worker_do_initial_read (gpointer data
)
868 GDBusWorker
*worker
= data
;
869 g_mutex_lock (&worker
->read_lock
);
870 _g_dbus_worker_do_read_unlocked (worker
);
871 g_mutex_unlock (&worker
->read_lock
);
875 /* ---------------------------------------------------------------------------------------------------- */
877 struct _MessageToWriteData
880 GDBusMessage
*message
;
885 GSimpleAsyncResult
*simple
;
890 message_to_write_data_free (MessageToWriteData
*data
)
892 _g_dbus_worker_unref (data
->worker
);
894 g_object_unref (data
->message
);
899 /* ---------------------------------------------------------------------------------------------------- */
901 static void write_message_continue_writing (MessageToWriteData
*data
);
903 /* called in private thread shared by all GDBusConnection instances
905 * write-lock is not held on entry
906 * output_pending is true on entry
909 write_message_async_cb (GObject
*source_object
,
913 MessageToWriteData
*data
= user_data
;
914 GSimpleAsyncResult
*simple
;
915 gssize bytes_written
;
918 /* Note: we can't access data->simple after calling g_async_result_complete () because the
919 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
921 simple
= data
->simple
;
924 bytes_written
= g_output_stream_write_finish (G_OUTPUT_STREAM (source_object
),
927 if (bytes_written
== -1)
929 g_simple_async_result_take_error (simple
, error
);
930 g_simple_async_result_complete (simple
);
931 g_object_unref (simple
);
934 g_assert (bytes_written
> 0); /* zero is never returned */
936 write_message_print_transport_debug (bytes_written
, data
);
938 data
->total_written
+= bytes_written
;
939 g_assert (data
->total_written
<= data
->blob_size
);
940 if (data
->total_written
== data
->blob_size
)
942 g_simple_async_result_complete (simple
);
943 g_object_unref (simple
);
947 write_message_continue_writing (data
);
953 /* called in private thread shared by all GDBusConnection instances
955 * write-lock is not held on entry
956 * output_pending is true on entry
959 on_socket_ready (GSocket
*socket
,
960 GIOCondition condition
,
963 MessageToWriteData
*data
= user_data
;
964 write_message_continue_writing (data
);
965 return FALSE
; /* remove source */
968 /* called in private thread shared by all GDBusConnection instances
970 * write-lock is not held on entry
971 * output_pending is true on entry
974 write_message_continue_writing (MessageToWriteData
*data
)
976 GOutputStream
*ostream
;
977 GSimpleAsyncResult
*simple
;
979 GUnixFDList
*fd_list
;
982 /* Note: we can't access data->simple after calling g_async_result_complete () because the
983 * callback can free @data and we're not completing in idle. So use a copy of the pointer.
985 simple
= data
->simple
;
987 ostream
= g_io_stream_get_output_stream (data
->worker
->stream
);
989 fd_list
= g_dbus_message_get_unix_fd_list (data
->message
);
992 g_assert (!g_output_stream_has_pending (ostream
));
993 g_assert_cmpint (data
->total_written
, <, data
->blob_size
);
999 else if (G_IS_SOCKET_OUTPUT_STREAM (ostream
) && data
->total_written
== 0)
1001 GOutputVector vector
;
1002 GSocketControlMessage
*control_message
;
1003 gssize bytes_written
;
1006 vector
.buffer
= data
->blob
;
1007 vector
.size
= data
->blob_size
;
1009 control_message
= NULL
;
1010 if (fd_list
!= NULL
&& g_unix_fd_list_get_length (fd_list
) > 0)
1012 if (!(data
->worker
->capabilities
& G_DBUS_CAPABILITY_FLAGS_UNIX_FD_PASSING
))
1014 g_simple_async_result_set_error (simple
,
1017 "Tried sending a file descriptor but remote peer does not support this capability");
1018 g_simple_async_result_complete (simple
);
1019 g_object_unref (simple
);
1022 control_message
= g_unix_fd_message_new_with_fd_list (fd_list
);
1026 bytes_written
= g_socket_send_message (data
->worker
->socket
,
1030 control_message
!= NULL
? &control_message
: NULL
,
1031 control_message
!= NULL
? 1 : 0,
1033 data
->worker
->cancellable
,
1035 if (control_message
!= NULL
)
1036 g_object_unref (control_message
);
1038 if (bytes_written
== -1)
1040 /* Handle WOULD_BLOCK by waiting until there's room in the buffer */
1041 if (g_error_matches (error
, G_IO_ERROR
, G_IO_ERROR_WOULD_BLOCK
))
1044 source
= g_socket_create_source (data
->worker
->socket
,
1045 G_IO_OUT
| G_IO_HUP
| G_IO_ERR
,
1046 data
->worker
->cancellable
);
1047 g_source_set_callback (source
,
1048 (GSourceFunc
) on_socket_ready
,
1050 NULL
); /* GDestroyNotify */
1051 g_source_attach (source
, g_main_context_get_thread_default ());
1052 g_source_unref (source
);
1053 g_error_free (error
);
1056 g_simple_async_result_take_error (simple
, error
);
1057 g_simple_async_result_complete (simple
);
1058 g_object_unref (simple
);
1061 g_assert (bytes_written
> 0); /* zero is never returned */
1063 write_message_print_transport_debug (bytes_written
, data
);
1065 data
->total_written
+= bytes_written
;
1066 g_assert (data
->total_written
<= data
->blob_size
);
1067 if (data
->total_written
== data
->blob_size
)
1069 g_simple_async_result_complete (simple
);
1070 g_object_unref (simple
);
1074 write_message_continue_writing (data
);
1080 if (fd_list
!= NULL
)
1082 g_simple_async_result_set_error (simple
,
1085 "Tried sending a file descriptor on unsupported stream of type %s",
1086 g_type_name (G_TYPE_FROM_INSTANCE (ostream
)));
1087 g_simple_async_result_complete (simple
);
1088 g_object_unref (simple
);
1093 g_output_stream_write_async (ostream
,
1094 (const gchar
*) data
->blob
+ data
->total_written
,
1095 data
->blob_size
- data
->total_written
,
1097 data
->worker
->cancellable
,
1098 write_message_async_cb
,
1105 /* called in private thread shared by all GDBusConnection instances
1107 * write-lock is not held on entry
1108 * output_pending is true on entry
1111 write_message_async (GDBusWorker
*worker
,
1112 MessageToWriteData
*data
,
1113 GAsyncReadyCallback callback
,
1116 data
->simple
= g_simple_async_result_new (NULL
,
1119 write_message_async
);
1120 data
->total_written
= 0;
1121 write_message_continue_writing (data
);
1124 /* called in private thread shared by all GDBusConnection instances (without write-lock held) */
1126 write_message_finish (GAsyncResult
*res
,
1129 g_warn_if_fail (g_simple_async_result_get_source_tag (G_SIMPLE_ASYNC_RESULT (res
)) == write_message_async
);
1130 if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res
), error
))
1135 /* ---------------------------------------------------------------------------------------------------- */
1137 static void maybe_write_next_message (GDBusWorker
*worker
);
1141 GDBusWorker
*worker
;
1146 flush_data_list_complete (const GList
*flushers
,
1147 const GError
*error
)
1151 for (l
= flushers
; l
!= NULL
; l
= l
->next
)
1153 FlushData
*f
= l
->data
;
1155 f
->error
= error
!= NULL
? g_error_copy (error
) : NULL
;
1157 g_mutex_lock (&f
->mutex
);
1158 g_cond_signal (&f
->cond
);
1159 g_mutex_unlock (&f
->mutex
);
1163 /* called in private thread shared by all GDBusConnection instances
1165 * write-lock is not held on entry
1166 * output_pending is true on entry
1169 ostream_flush_cb (GObject
*source_object
,
1173 FlushAsyncData
*data
= user_data
;
1177 g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object
),
1183 if (G_UNLIKELY (_g_dbus_debug_transport ()))
1185 _g_dbus_debug_print_lock ();
1186 g_print ("========================================================================\n"
1187 "GDBus-debug:Transport:\n"
1188 " ---- FLUSHED stream of type %s\n",
1189 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data
->worker
->stream
))));
1190 _g_dbus_debug_print_unlock ();
1194 g_assert (data
->flushers
!= NULL
);
1195 flush_data_list_complete (data
->flushers
, error
);
1196 g_list_free (data
->flushers
);
1199 g_error_free (error
);
1201 /* Make sure we tell folks that we don't have additional
1203 g_mutex_lock (&data
->worker
->write_lock
);
1204 g_assert (data
->worker
->output_pending
);
1205 data
->worker
->output_pending
= FALSE
;
1206 g_mutex_unlock (&data
->worker
->write_lock
);
1208 /* OK, cool, finally kick off the next write */
1209 maybe_write_next_message (data
->worker
);
1211 _g_dbus_worker_unref (data
->worker
);
1215 /* called in private thread shared by all GDBusConnection instances
1217 * write-lock is not held on entry
1218 * output_pending is false on entry
1221 message_written (GDBusWorker
*worker
,
1222 MessageToWriteData
*message_data
)
1228 /* first log the fact that we wrote a message */
1229 if (G_UNLIKELY (_g_dbus_debug_message ()))
1232 _g_dbus_debug_print_lock ();
1233 g_print ("========================================================================\n"
1234 "GDBus-debug:Message:\n"
1235 " >>>> SENT D-Bus message (%" G_GSIZE_FORMAT
" bytes)\n",
1236 message_data
->blob_size
);
1237 s
= g_dbus_message_print (message_data
->message
, 2);
1240 if (G_UNLIKELY (_g_dbus_debug_payload ()))
1242 s
= _g_dbus_hexdump (message_data
->blob
, message_data
->blob_size
, 2);
1243 g_print ("%s\n", s
);
1246 _g_dbus_debug_print_unlock ();
1249 /* then first wake up pending flushes and, if needed, flush the stream */
1251 g_mutex_lock (&worker
->write_lock
);
1252 worker
->write_num_messages_written
+= 1;
1253 for (l
= worker
->write_pending_flushes
; l
!= NULL
; l
= ll
)
1255 FlushData
*f
= l
->data
;
1258 if (f
->number_to_wait_for
== worker
->write_num_messages_written
)
1260 flushers
= g_list_append (flushers
, f
);
1261 worker
->write_pending_flushes
= g_list_delete_link (worker
->write_pending_flushes
, l
);
1264 if (flushers
!= NULL
)
1266 g_assert (!worker
->output_pending
);
1267 worker
->output_pending
= TRUE
;
1269 g_mutex_unlock (&worker
->write_lock
);
1271 if (flushers
!= NULL
)
1273 FlushAsyncData
*data
;
1274 data
= g_new0 (FlushAsyncData
, 1);
1275 data
->worker
= _g_dbus_worker_ref (worker
);
1276 data
->flushers
= flushers
;
1277 /* flush the stream before writing the next message */
1278 g_output_stream_flush_async (g_io_stream_get_output_stream (worker
->stream
),
1280 worker
->cancellable
,
1286 /* kick off the next write! */
1287 maybe_write_next_message (worker
);
1291 /* called in private thread shared by all GDBusConnection instances
1293 * write-lock is not held on entry
1294 * output_pending is true on entry
1297 write_message_cb (GObject
*source_object
,
1301 MessageToWriteData
*data
= user_data
;
1304 g_mutex_lock (&data
->worker
->write_lock
);
1305 g_assert (data
->worker
->output_pending
);
1306 data
->worker
->output_pending
= FALSE
;
1307 g_mutex_unlock (&data
->worker
->write_lock
);
1310 if (!write_message_finish (res
, &error
))
1313 _g_dbus_worker_emit_disconnected (data
->worker
, TRUE
, error
);
1314 g_error_free (error
);
1317 /* this function will also kick of the next write (it might need to
1318 * flush so writing the next message might happen much later
1321 message_written (data
->worker
, data
);
1323 message_to_write_data_free (data
);
1326 /* called in private thread shared by all GDBusConnection instances
1328 * write-lock is not held on entry
1329 * output_pending is true on entry
1332 iostream_close_cb (GObject
*source_object
,
1336 GDBusWorker
*worker
= user_data
;
1337 GError
*error
= NULL
;
1338 GList
*pending_close_attempts
, *pending_flush_attempts
;
1341 g_io_stream_close_finish (worker
->stream
, res
, &error
);
1343 g_mutex_lock (&worker
->write_lock
);
1345 pending_close_attempts
= worker
->pending_close_attempts
;
1346 worker
->pending_close_attempts
= NULL
;
1348 pending_flush_attempts
= worker
->write_pending_flushes
;
1349 worker
->write_pending_flushes
= NULL
;
1351 send_queue
= worker
->write_queue
;
1352 worker
->write_queue
= g_queue_new ();
1354 g_assert (worker
->output_pending
);
1355 worker
->output_pending
= FALSE
;
1357 g_mutex_unlock (&worker
->write_lock
);
1359 while (pending_close_attempts
!= NULL
)
1361 CloseData
*close_data
= pending_close_attempts
->data
;
1363 pending_close_attempts
= g_list_delete_link (pending_close_attempts
,
1364 pending_close_attempts
);
1366 if (close_data
->result
!= NULL
)
1369 g_simple_async_result_set_from_error (close_data
->result
, error
);
1371 /* this must be in an idle because the result is likely to be
1372 * intended for another thread
1374 g_simple_async_result_complete_in_idle (close_data
->result
);
1377 close_data_free (close_data
);
1380 g_clear_error (&error
);
1382 /* all messages queued for sending are discarded */
1383 g_queue_foreach (send_queue
, (GFunc
) message_to_write_data_free
, NULL
);
1384 g_queue_free (send_queue
);
1386 /* all queued flushes fail */
1387 error
= g_error_new (G_IO_ERROR
, G_IO_ERROR_CANCELLED
,
1388 _("Operation was cancelled"));
1389 flush_data_list_complete (pending_flush_attempts
, error
);
1390 g_list_free (pending_flush_attempts
);
1391 g_clear_error (&error
);
1393 _g_dbus_worker_unref (worker
);
1396 /* called in private thread shared by all GDBusConnection instances
1398 * write-lock is not held on entry
1399 * output_pending must be false on entry
1402 maybe_write_next_message (GDBusWorker
*worker
)
1404 MessageToWriteData
*data
;
1407 /* we mustn't try to write two things at once */
1408 g_assert (!worker
->output_pending
);
1410 g_mutex_lock (&worker
->write_lock
);
1412 /* if we want to close the connection, that takes precedence */
1413 if (worker
->pending_close_attempts
!= NULL
)
1415 worker
->close_expected
= TRUE
;
1416 worker
->output_pending
= TRUE
;
1418 g_io_stream_close_async (worker
->stream
, G_PRIORITY_DEFAULT
,
1419 NULL
, iostream_close_cb
,
1420 _g_dbus_worker_ref (worker
));
1425 data
= g_queue_pop_head (worker
->write_queue
);
1428 worker
->output_pending
= TRUE
;
1431 g_mutex_unlock (&worker
->write_lock
);
1433 /* Note that write_lock is only used for protecting the @write_queue
1434 * and @output_pending fields of the GDBusWorker struct ... which we
1435 * need to modify from arbitrary threads in _g_dbus_worker_send_message().
1437 * Therefore, it's fine to drop it here when calling back into user
1438 * code and then writing the message out onto the GIOStream since this
1439 * function only runs on the worker thread.
1443 GDBusMessage
*old_message
;
1445 gsize new_blob_size
;
1448 old_message
= data
->message
;
1449 data
->message
= _g_dbus_worker_emit_message_about_to_be_sent (worker
, data
->message
);
1450 if (data
->message
== old_message
)
1452 /* filters had no effect - do nothing */
1454 else if (data
->message
== NULL
)
1456 /* filters dropped message */
1457 g_mutex_lock (&worker
->write_lock
);
1458 worker
->output_pending
= FALSE
;
1459 g_mutex_unlock (&worker
->write_lock
);
1460 message_to_write_data_free (data
);
1465 /* filters altered the message -> reencode */
1467 new_blob
= g_dbus_message_to_blob (data
->message
,
1469 worker
->capabilities
,
1471 if (new_blob
== NULL
)
1473 /* if filter make the GDBusMessage unencodeable, just complain on stderr and send
1474 * the old message instead
1476 g_warning ("Error encoding GDBusMessage with serial %d altered by filter function: %s",
1477 g_dbus_message_get_serial (data
->message
),
1479 g_error_free (error
);
1483 g_free (data
->blob
);
1484 data
->blob
= (gchar
*) new_blob
;
1485 data
->blob_size
= new_blob_size
;
1489 write_message_async (worker
,
1496 /* called in private thread shared by all GDBusConnection instances
1498 * write-lock is not held on entry
1499 * output_pending may be true or false
1502 write_message_in_idle_cb (gpointer user_data
)
1504 GDBusWorker
*worker
= user_data
;
1506 /* Because this is the worker thread, we can read this struct member
1507 * without holding the lock: no other thread ever modifies it.
1509 if (!worker
->output_pending
)
1510 maybe_write_next_message (worker
);
1516 * @write_data: (transfer full) (allow-none):
1517 * @close_data: (transfer full) (allow-none):
1519 * Can be called from any thread
1521 * write_lock is not held on entry
1522 * output_pending may be true or false
1525 schedule_write_in_worker_thread (GDBusWorker
*worker
,
1526 MessageToWriteData
*write_data
,
1527 CloseData
*close_data
)
1529 g_mutex_lock (&worker
->write_lock
);
1531 if (write_data
!= NULL
)
1532 g_queue_push_tail (worker
->write_queue
, write_data
);
1534 if (close_data
!= NULL
)
1535 worker
->pending_close_attempts
= g_list_prepend (worker
->pending_close_attempts
,
1538 if (!worker
->output_pending
)
1540 GSource
*idle_source
;
1541 idle_source
= g_idle_source_new ();
1542 g_source_set_priority (idle_source
, G_PRIORITY_DEFAULT
);
1543 g_source_set_callback (idle_source
,
1544 write_message_in_idle_cb
,
1545 _g_dbus_worker_ref (worker
),
1546 (GDestroyNotify
) _g_dbus_worker_unref
);
1547 g_source_attach (idle_source
, worker
->shared_thread_data
->context
);
1548 g_source_unref (idle_source
);
1551 g_mutex_unlock (&worker
->write_lock
);
1554 /* ---------------------------------------------------------------------------------------------------- */
1556 /* can be called from any thread - steals blob
1558 * write_lock is not held on entry
1559 * output_pending may be true or false
1562 _g_dbus_worker_send_message (GDBusWorker
*worker
,
1563 GDBusMessage
*message
,
1567 MessageToWriteData
*data
;
1569 g_return_if_fail (G_IS_DBUS_MESSAGE (message
));
1570 g_return_if_fail (blob
!= NULL
);
1571 g_return_if_fail (blob_len
> 16);
1573 data
= g_new0 (MessageToWriteData
, 1);
1574 data
->worker
= _g_dbus_worker_ref (worker
);
1575 data
->message
= g_object_ref (message
);
1576 data
->blob
= blob
; /* steal! */
1577 data
->blob_size
= blob_len
;
1579 schedule_write_in_worker_thread (worker
, data
, NULL
);
1582 /* ---------------------------------------------------------------------------------------------------- */
1585 _g_dbus_worker_new (GIOStream
*stream
,
1586 GDBusCapabilityFlags capabilities
,
1587 gboolean initially_frozen
,
1588 GDBusWorkerMessageReceivedCallback message_received_callback
,
1589 GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback
,
1590 GDBusWorkerDisconnectedCallback disconnected_callback
,
1593 GDBusWorker
*worker
;
1594 GSource
*idle_source
;
1596 g_return_val_if_fail (G_IS_IO_STREAM (stream
), NULL
);
1597 g_return_val_if_fail (message_received_callback
!= NULL
, NULL
);
1598 g_return_val_if_fail (message_about_to_be_sent_callback
!= NULL
, NULL
);
1599 g_return_val_if_fail (disconnected_callback
!= NULL
, NULL
);
1601 worker
= g_new0 (GDBusWorker
, 1);
1602 worker
->ref_count
= 1;
1604 g_mutex_init (&worker
->read_lock
);
1605 worker
->message_received_callback
= message_received_callback
;
1606 worker
->message_about_to_be_sent_callback
= message_about_to_be_sent_callback
;
1607 worker
->disconnected_callback
= disconnected_callback
;
1608 worker
->user_data
= user_data
;
1609 worker
->stream
= g_object_ref (stream
);
1610 worker
->capabilities
= capabilities
;
1611 worker
->cancellable
= g_cancellable_new ();
1612 worker
->output_pending
= FALSE
;
1614 worker
->frozen
= initially_frozen
;
1615 worker
->received_messages_while_frozen
= g_queue_new ();
1617 g_mutex_init (&worker
->write_lock
);
1618 worker
->write_queue
= g_queue_new ();
1620 if (G_IS_SOCKET_CONNECTION (worker
->stream
))
1621 worker
->socket
= g_socket_connection_get_socket (G_SOCKET_CONNECTION (worker
->stream
));
1623 worker
->shared_thread_data
= _g_dbus_shared_thread_ref ();
1626 idle_source
= g_idle_source_new ();
1627 g_source_set_priority (idle_source
, G_PRIORITY_DEFAULT
);
1628 g_source_set_callback (idle_source
,
1629 _g_dbus_worker_do_initial_read
,
1630 _g_dbus_worker_ref (worker
),
1631 (GDestroyNotify
) _g_dbus_worker_unref
);
1632 g_source_attach (idle_source
, worker
->shared_thread_data
->context
);
1633 g_source_unref (idle_source
);
1638 /* ---------------------------------------------------------------------------------------------------- */
1640 /* can be called from any thread
1642 * write_lock is not held on entry
1643 * output_pending may be true or false
1646 _g_dbus_worker_close (GDBusWorker
*worker
,
1647 GCancellable
*cancellable
,
1648 GSimpleAsyncResult
*result
)
1650 CloseData
*close_data
;
1652 close_data
= g_slice_new0 (CloseData
);
1653 close_data
->worker
= _g_dbus_worker_ref (worker
);
1654 close_data
->cancellable
=
1655 (cancellable
== NULL
? NULL
: g_object_ref (cancellable
));
1656 close_data
->result
= (result
== NULL
? NULL
: g_object_ref (result
));
1658 /* Don't set worker->close_expected here - we're in the wrong thread.
1659 * It'll be set before the actual close happens.
1661 g_cancellable_cancel (worker
->cancellable
);
1662 schedule_write_in_worker_thread (worker
, NULL
, close_data
);
1665 /* This can be called from any thread - frees worker. Note that
1666 * callbacks might still happen if called from another thread than the
1667 * worker - use your own synchronization primitive in the callbacks.
1669 * write_lock is not held on entry
1670 * output_pending may be true or false
1673 _g_dbus_worker_stop (GDBusWorker
*worker
)
1675 g_atomic_int_set (&worker
->stopped
, TRUE
);
1677 /* Cancel any pending operations and schedule a close of the underlying I/O
1678 * stream in the worker thread
1680 _g_dbus_worker_close (worker
, NULL
, NULL
);
1682 /* _g_dbus_worker_close holds a ref until after an idle in the the worker
1683 * thread has run, so we no longer need to unref in an idle like in
1686 _g_dbus_worker_unref (worker
);
1689 /* ---------------------------------------------------------------------------------------------------- */
1691 /* can be called from any thread (except the worker thread) - blocks
1692 * calling thread until all queued outgoing messages are written and
1693 * the transport has been flushed
1695 * write_lock is not held on entry
1696 * output_pending may be true or false
1699 _g_dbus_worker_flush_sync (GDBusWorker
*worker
,
1700 GCancellable
*cancellable
,
1709 /* if the queue is empty, there's nothing to wait for */
1710 g_mutex_lock (&worker
->write_lock
);
1711 if (g_queue_get_length (worker
->write_queue
) > 0)
1713 data
= g_new0 (FlushData
, 1);
1714 g_mutex_init (&data
->mutex
);
1715 g_cond_init (&data
->cond
);
1716 data
->number_to_wait_for
= worker
->write_num_messages_written
+ g_queue_get_length (worker
->write_queue
);
1717 g_mutex_lock (&data
->mutex
);
1718 worker
->write_pending_flushes
= g_list_prepend (worker
->write_pending_flushes
, data
);
1720 g_mutex_unlock (&worker
->write_lock
);
1724 g_cond_wait (&data
->cond
, &data
->mutex
);
1725 g_mutex_unlock (&data
->mutex
);
1727 /* note:the element is removed from worker->write_pending_flushes in flush_cb() above */
1728 g_cond_clear (&data
->cond
);
1729 g_mutex_clear (&data
->mutex
);
1730 if (data
->error
!= NULL
)
1733 g_propagate_error (error
, data
->error
);
1741 /* ---------------------------------------------------------------------------------------------------- */
1743 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
1744 #define G_DBUS_DEBUG_TRANSPORT (1<<1)
1745 #define G_DBUS_DEBUG_MESSAGE (1<<2)
1746 #define G_DBUS_DEBUG_PAYLOAD (1<<3)
1747 #define G_DBUS_DEBUG_CALL (1<<4)
1748 #define G_DBUS_DEBUG_SIGNAL (1<<5)
1749 #define G_DBUS_DEBUG_INCOMING (1<<6)
1750 #define G_DBUS_DEBUG_RETURN (1<<7)
1751 #define G_DBUS_DEBUG_EMISSION (1<<8)
1752 #define G_DBUS_DEBUG_ADDRESS (1<<9)
1754 static gint _gdbus_debug_flags
= 0;
1757 _g_dbus_debug_authentication (void)
1759 _g_dbus_initialize ();
1760 return (_gdbus_debug_flags
& G_DBUS_DEBUG_AUTHENTICATION
) != 0;
1764 _g_dbus_debug_transport (void)
1766 _g_dbus_initialize ();
1767 return (_gdbus_debug_flags
& G_DBUS_DEBUG_TRANSPORT
) != 0;
1771 _g_dbus_debug_message (void)
1773 _g_dbus_initialize ();
1774 return (_gdbus_debug_flags
& G_DBUS_DEBUG_MESSAGE
) != 0;
1778 _g_dbus_debug_payload (void)
1780 _g_dbus_initialize ();
1781 return (_gdbus_debug_flags
& G_DBUS_DEBUG_PAYLOAD
) != 0;
1785 _g_dbus_debug_call (void)
1787 _g_dbus_initialize ();
1788 return (_gdbus_debug_flags
& G_DBUS_DEBUG_CALL
) != 0;
1792 _g_dbus_debug_signal (void)
1794 _g_dbus_initialize ();
1795 return (_gdbus_debug_flags
& G_DBUS_DEBUG_SIGNAL
) != 0;
1799 _g_dbus_debug_incoming (void)
1801 _g_dbus_initialize ();
1802 return (_gdbus_debug_flags
& G_DBUS_DEBUG_INCOMING
) != 0;
1806 _g_dbus_debug_return (void)
1808 _g_dbus_initialize ();
1809 return (_gdbus_debug_flags
& G_DBUS_DEBUG_RETURN
) != 0;
1813 _g_dbus_debug_emission (void)
1815 _g_dbus_initialize ();
1816 return (_gdbus_debug_flags
& G_DBUS_DEBUG_EMISSION
) != 0;
1820 _g_dbus_debug_address (void)
1822 _g_dbus_initialize ();
1823 return (_gdbus_debug_flags
& G_DBUS_DEBUG_ADDRESS
) != 0;
1826 G_LOCK_DEFINE_STATIC (print_lock
);
1829 _g_dbus_debug_print_lock (void)
1831 G_LOCK (print_lock
);
1835 _g_dbus_debug_print_unlock (void)
1837 G_UNLOCK (print_lock
);
1841 * _g_dbus_initialize:
1843 * Does various one-time init things such as
1845 * - registering the G_DBUS_ERROR error domain
1846 * - parses the G_DBUS_DEBUG environment variable
1849 _g_dbus_initialize (void)
1851 static volatile gsize initialized
= 0;
1853 if (g_once_init_enter (&initialized
))
1855 volatile GQuark g_dbus_error_domain
;
1858 g_dbus_error_domain
= G_DBUS_ERROR
;
1859 (g_dbus_error_domain
); /* To avoid -Wunused-but-set-variable */
1861 debug
= g_getenv ("G_DBUS_DEBUG");
1864 const GDebugKey keys
[] = {
1865 { "authentication", G_DBUS_DEBUG_AUTHENTICATION
},
1866 { "transport", G_DBUS_DEBUG_TRANSPORT
},
1867 { "message", G_DBUS_DEBUG_MESSAGE
},
1868 { "payload", G_DBUS_DEBUG_PAYLOAD
},
1869 { "call", G_DBUS_DEBUG_CALL
},
1870 { "signal", G_DBUS_DEBUG_SIGNAL
},
1871 { "incoming", G_DBUS_DEBUG_INCOMING
},
1872 { "return", G_DBUS_DEBUG_RETURN
},
1873 { "emission", G_DBUS_DEBUG_EMISSION
},
1874 { "address", G_DBUS_DEBUG_ADDRESS
}
1877 _gdbus_debug_flags
= g_parse_debug_string (debug
, keys
, G_N_ELEMENTS (keys
));
1878 if (_gdbus_debug_flags
& G_DBUS_DEBUG_PAYLOAD
)
1879 _gdbus_debug_flags
|= G_DBUS_DEBUG_MESSAGE
;
1882 g_once_init_leave (&initialized
, 1);
1886 /* ---------------------------------------------------------------------------------------------------- */
1889 _g_dbus_compute_complete_signature (GDBusArgInfo
**args
)
1891 const GVariantType
*arg_types
[256];
1895 for (n
= 0; args
[n
] != NULL
; n
++)
1897 /* DBus places a hard limit of 255 on signature length.
1898 * therefore number of args must be less than 256.
1902 arg_types
[n
] = G_VARIANT_TYPE (args
[n
]->signature
);
1904 if G_UNLIKELY (arg_types
[n
] == NULL
)
1910 return g_variant_type_new_tuple (arg_types
, n
);
1913 /* ---------------------------------------------------------------------------------------------------- */
1917 extern BOOL WINAPI
ConvertSidToStringSidA (PSID Sid
, LPSTR
*StringSid
);
1920 _g_dbus_win32_get_user_sid (void)
1924 DWORD token_information_len
;
1931 h
= INVALID_HANDLE_VALUE
;
1933 if (!OpenProcessToken (GetCurrentProcess (), TOKEN_QUERY
, &h
))
1935 g_warning ("OpenProcessToken failed with error code %d", (gint
) GetLastError ());
1939 /* Get length of buffer */
1940 token_information_len
= 0;
1941 if (!GetTokenInformation (h
, TokenUser
, NULL
, 0, &token_information_len
))
1943 if (GetLastError () != ERROR_INSUFFICIENT_BUFFER
)
1945 g_warning ("GetTokenInformation() failed with error code %d", (gint
) GetLastError ());
1949 user
= g_malloc (token_information_len
);
1950 if (!GetTokenInformation (h
, TokenUser
, user
, token_information_len
, &token_information_len
))
1952 g_warning ("GetTokenInformation() failed with error code %d", (gint
) GetLastError ());
1956 psid
= user
->User
.Sid
;
1957 if (!IsValidSid (psid
))
1959 g_warning ("Invalid SID");
1963 if (!ConvertSidToStringSidA (psid
, &sid
))
1965 g_warning ("Invalid SID");
1969 ret
= g_strdup (sid
);
1974 if (h
!= INVALID_HANDLE_VALUE
)
1980 /* ---------------------------------------------------------------------------------------------------- */
1983 _g_dbus_get_machine_id (GError
**error
)
1986 /* TODO: use PACKAGE_LOCALSTATEDIR ? */
1988 if (!g_file_get_contents ("/var/lib/dbus/machine-id",
1993 g_prefix_error (error
, _("Unable to load /var/lib/dbus/machine-id: "));
1997 /* TODO: validate value */
2003 /* ---------------------------------------------------------------------------------------------------- */
2006 _g_dbus_enum_to_string (GType enum_type
, gint value
)
2010 GEnumValue
*enum_value
;
2012 klass
= g_type_class_ref (enum_type
);
2013 enum_value
= g_enum_get_value (klass
, value
);
2014 if (enum_value
!= NULL
)
2015 ret
= g_strdup (enum_value
->value_nick
);
2017 ret
= g_strdup_printf ("unknown (value %d)", value
);
2018 g_type_class_unref (klass
);
2022 /* ---------------------------------------------------------------------------------------------------- */
2025 write_message_print_transport_debug (gssize bytes_written
,
2026 MessageToWriteData
*data
)
2028 if (G_LIKELY (!_g_dbus_debug_transport ()))
2031 _g_dbus_debug_print_lock ();
2032 g_print ("========================================================================\n"
2033 "GDBus-debug:Transport:\n"
2034 " >>>> WROTE %" G_GSIZE_FORMAT
" bytes of message with serial %d and\n"
2035 " size %" G_GSIZE_FORMAT
" from offset %" G_GSIZE_FORMAT
" on a %s\n",
2037 g_dbus_message_get_serial (data
->message
),
2039 data
->total_written
,
2040 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_output_stream (data
->worker
->stream
))));
2041 _g_dbus_debug_print_unlock ();
2046 /* ---------------------------------------------------------------------------------------------------- */
2049 read_message_print_transport_debug (gssize bytes_read
,
2050 GDBusWorker
*worker
)
2054 gint32 message_length
;
2056 if (G_LIKELY (!_g_dbus_debug_transport ()))
2059 size
= bytes_read
+ worker
->read_buffer_cur_size
;
2063 message_length
= g_dbus_message_bytes_needed ((guchar
*) worker
->read_buffer
, size
, NULL
);
2066 switch (worker
->read_buffer
[0])
2070 serial
= GUINT32_FROM_LE (((guint32
*) worker
->read_buffer
)[2]);
2074 serial
= GUINT32_FROM_BE (((guint32
*) worker
->read_buffer
)[2]);
2077 /* an error will be set elsewhere if this happens */
2082 _g_dbus_debug_print_lock ();
2083 g_print ("========================================================================\n"
2084 "GDBus-debug:Transport:\n"
2085 " <<<< READ %" G_GSIZE_FORMAT
" bytes of message with serial %d and\n"
2086 " size %d to offset %" G_GSIZE_FORMAT
" from a %s\n",
2090 worker
->read_buffer_cur_size
,
2091 g_type_name (G_TYPE_FROM_INSTANCE (g_io_stream_get_input_stream (worker
->stream
))));
2092 _g_dbus_debug_print_unlock ();
2097 /* ---------------------------------------------------------------------------------------------------- */
2100 _g_signal_accumulator_false_handled (GSignalInvocationHint
*ihint
,
2101 GValue
*return_accu
,
2102 const GValue
*handler_return
,
2105 gboolean continue_emission
;
2106 gboolean signal_return
;
2108 signal_return
= g_value_get_boolean (handler_return
);
2109 g_value_set_boolean (return_accu
, signal_return
);
2110 continue_emission
= signal_return
;
2112 return continue_emission
;