Roll WebKit 141933:141963
[chromium-blink-merge.git] / dbus / bus.cc
blob4096049417884f76044a275adbadb5e060f59636
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 //
5 // TODO(satorux):
6 // - Handle "disconnected" signal.
8 #include "dbus/bus.h"
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/message_loop.h"
13 #include "base/message_loop_proxy.h"
14 #include "base/stl_util.h"
15 #include "base/threading/thread.h"
16 #include "base/threading/thread_restrictions.h"
17 #include "base/time.h"
18 #include "dbus/exported_object.h"
19 #include "dbus/object_path.h"
20 #include "dbus/object_proxy.h"
21 #include "dbus/scoped_dbus_error.h"
23 namespace dbus {
25 namespace {
27 // The class is used for watching the file descriptor used for D-Bus
28 // communication.
29 class Watch : public base::MessagePumpLibevent::Watcher {
30 public:
31 Watch(DBusWatch* watch)
32 : raw_watch_(watch) {
33 dbus_watch_set_data(raw_watch_, this, NULL);
36 virtual ~Watch() {
37 dbus_watch_set_data(raw_watch_, NULL, NULL);
40 // Returns true if the underlying file descriptor is ready to be watched.
41 bool IsReadyToBeWatched() {
42 return dbus_watch_get_enabled(raw_watch_);
45 // Starts watching the underlying file descriptor.
46 void StartWatching() {
47 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
48 const int flags = dbus_watch_get_flags(raw_watch_);
50 MessageLoopForIO::Mode mode = MessageLoopForIO::WATCH_READ;
51 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
52 mode = MessageLoopForIO::WATCH_READ_WRITE;
53 else if (flags & DBUS_WATCH_READABLE)
54 mode = MessageLoopForIO::WATCH_READ;
55 else if (flags & DBUS_WATCH_WRITABLE)
56 mode = MessageLoopForIO::WATCH_WRITE;
57 else
58 NOTREACHED();
60 const bool persistent = true; // Watch persistently.
61 const bool success = MessageLoopForIO::current()->WatchFileDescriptor(
62 file_descriptor,
63 persistent,
64 mode,
65 &file_descriptor_watcher_,
66 this);
67 CHECK(success) << "Unable to allocate memory";
70 // Stops watching the underlying file descriptor.
71 void StopWatching() {
72 file_descriptor_watcher_.StopWatchingFileDescriptor();
75 private:
76 // Implement MessagePumpLibevent::Watcher.
77 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
78 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
79 CHECK(success) << "Unable to allocate memory";
82 // Implement MessagePumpLibevent::Watcher.
83 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
84 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
85 CHECK(success) << "Unable to allocate memory";
88 DBusWatch* raw_watch_;
89 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
92 // The class is used for monitoring the timeout used for D-Bus method
93 // calls.
95 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
96 // the object is is alive when HandleTimeout() is called. It's unlikely
97 // but it may be possible that HandleTimeout() is called after
98 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
99 // Bus::OnRemoveTimeout().
100 class Timeout : public base::RefCountedThreadSafe<Timeout> {
101 public:
102 Timeout(DBusTimeout* timeout)
103 : raw_timeout_(timeout),
104 monitoring_is_active_(false),
105 is_completed(false) {
106 dbus_timeout_set_data(raw_timeout_, this, NULL);
107 AddRef(); // Balanced on Complete().
110 // Returns true if the timeout is ready to be monitored.
111 bool IsReadyToBeMonitored() {
112 return dbus_timeout_get_enabled(raw_timeout_);
115 // Starts monitoring the timeout.
116 void StartMonitoring(dbus::Bus* bus) {
117 bus->PostDelayedTaskToDBusThread(FROM_HERE,
118 base::Bind(&Timeout::HandleTimeout,
119 this),
120 GetInterval());
121 monitoring_is_active_ = true;
124 // Stops monitoring the timeout.
125 void StopMonitoring() {
126 // We cannot take back the delayed task we posted in
127 // StartMonitoring(), so we just mark the monitoring is inactive now.
128 monitoring_is_active_ = false;
131 // Returns the interval.
132 base::TimeDelta GetInterval() {
133 return base::TimeDelta::FromMilliseconds(
134 dbus_timeout_get_interval(raw_timeout_));
137 // Cleans up the raw_timeout and marks that timeout is completed.
138 // See the class comment above for why we are doing this.
139 void Complete() {
140 dbus_timeout_set_data(raw_timeout_, NULL, NULL);
141 is_completed = true;
142 Release();
145 private:
146 friend class base::RefCountedThreadSafe<Timeout>;
147 ~Timeout() {
150 // Handles the timeout.
151 void HandleTimeout() {
152 // If the timeout is marked completed, we should do nothing. This can
153 // occur if this function is called after Bus::OnRemoveTimeout().
154 if (is_completed)
155 return;
156 // Skip if monitoring is canceled.
157 if (!monitoring_is_active_)
158 return;
160 const bool success = dbus_timeout_handle(raw_timeout_);
161 CHECK(success) << "Unable to allocate memory";
164 DBusTimeout* raw_timeout_;
165 bool monitoring_is_active_;
166 bool is_completed;
169 } // namespace
171 Bus::Options::Options()
172 : bus_type(SESSION),
173 connection_type(PRIVATE) {
176 Bus::Options::~Options() {
179 Bus::Bus(const Options& options)
180 : bus_type_(options.bus_type),
181 connection_type_(options.connection_type),
182 dbus_thread_message_loop_proxy_(options.dbus_thread_message_loop_proxy),
183 on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
184 connection_(NULL),
185 origin_thread_id_(base::PlatformThread::CurrentId()),
186 async_operations_set_up_(false),
187 shutdown_completed_(false),
188 num_pending_watches_(0),
189 num_pending_timeouts_(0),
190 address_(options.address) {
191 // This is safe to call multiple times.
192 dbus_threads_init_default();
193 // The origin message loop is unnecessary if the client uses synchronous
194 // functions only.
195 if (MessageLoop::current())
196 origin_message_loop_proxy_ = MessageLoop::current()->message_loop_proxy();
199 Bus::~Bus() {
200 DCHECK(!connection_);
201 DCHECK(owned_service_names_.empty());
202 DCHECK(match_rules_added_.empty());
203 DCHECK(filter_functions_added_.empty());
204 DCHECK(registered_object_paths_.empty());
205 DCHECK_EQ(0, num_pending_watches_);
206 // TODO(satorux): This check fails occasionally in browser_tests for tests
207 // that run very quickly. Perhaps something does not have time to clean up.
208 // Despite the check failing, the tests seem to run fine. crosbug.com/23416
209 // DCHECK_EQ(0, num_pending_timeouts_);
212 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
213 const ObjectPath& object_path) {
214 return GetObjectProxyWithOptions(service_name, object_path,
215 ObjectProxy::DEFAULT_OPTIONS);
218 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
219 const dbus::ObjectPath& object_path,
220 int options) {
221 AssertOnOriginThread();
223 // Check if we already have the requested object proxy.
224 const ObjectProxyTable::key_type key(service_name + object_path.value(),
225 options);
226 ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
227 if (iter != object_proxy_table_.end()) {
228 return iter->second;
231 scoped_refptr<ObjectProxy> object_proxy =
232 new ObjectProxy(this, service_name, object_path, options);
233 object_proxy_table_[key] = object_proxy;
235 return object_proxy.get();
238 bool Bus::RemoveObjectProxy(const std::string& service_name,
239 const ObjectPath& object_path,
240 const base::Closure& callback) {
241 return RemoveObjectProxyWithOptions(service_name, object_path,
242 ObjectProxy::DEFAULT_OPTIONS,
243 callback);
246 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
247 const dbus::ObjectPath& object_path,
248 int options,
249 const base::Closure& callback) {
250 AssertOnOriginThread();
252 // Check if we have the requested object proxy.
253 const ObjectProxyTable::key_type key(service_name + object_path.value(),
254 options);
255 ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
256 if (iter != object_proxy_table_.end()) {
257 // Object is present. Remove it now and Detach in the DBus thread.
258 PostTaskToDBusThread(FROM_HERE, base::Bind(
259 &Bus::RemoveObjectProxyInternal,
260 this, iter->second, callback));
262 object_proxy_table_.erase(iter);
263 return true;
265 return false;
268 void Bus::RemoveObjectProxyInternal(
269 scoped_refptr<dbus::ObjectProxy> object_proxy,
270 const base::Closure& callback) {
271 AssertOnDBusThread();
273 object_proxy.get()->Detach();
275 PostTaskToOriginThread(FROM_HERE, callback);
278 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
279 AssertOnOriginThread();
281 // Check if we already have the requested exported object.
282 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
283 if (iter != exported_object_table_.end()) {
284 return iter->second;
287 scoped_refptr<ExportedObject> exported_object =
288 new ExportedObject(this, object_path);
289 exported_object_table_[object_path] = exported_object;
291 return exported_object.get();
294 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
295 AssertOnOriginThread();
297 // Remove the registered object from the table first, to allow a new
298 // GetExportedObject() call to return a new object, rather than this one.
299 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
300 if (iter == exported_object_table_.end())
301 return;
303 scoped_refptr<ExportedObject> exported_object = iter->second;
304 exported_object_table_.erase(iter);
306 // Post the task to perform the final unregistration to the D-Bus thread.
307 // Since the registration also happens on the D-Bus thread in
308 // TryRegisterObjectPath(), and the message loop proxy we post to is a
309 // MessageLoopProxy which inherits from SequencedTaskRunner, there is a
310 // guarantee that this will happen before any future registration call.
311 PostTaskToDBusThread(FROM_HERE, base::Bind(
312 &Bus::UnregisterExportedObjectInternal,
313 this, exported_object));
316 void Bus::UnregisterExportedObjectInternal(
317 scoped_refptr<dbus::ExportedObject> exported_object) {
318 AssertOnDBusThread();
320 exported_object->Unregister();
323 bool Bus::Connect() {
324 // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
325 AssertOnDBusThread();
327 // Check if it's already initialized.
328 if (connection_)
329 return true;
331 ScopedDBusError error;
332 if (bus_type_ == CUSTOM_ADDRESS) {
333 if (connection_type_ == PRIVATE) {
334 connection_ = dbus_connection_open_private(address_.c_str(), error.get());
335 } else {
336 connection_ = dbus_connection_open(address_.c_str(), error.get());
338 } else {
339 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
340 if (connection_type_ == PRIVATE) {
341 connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
342 } else {
343 connection_ = dbus_bus_get(dbus_bus_type, error.get());
346 if (!connection_) {
347 LOG(ERROR) << "Failed to connect to the bus: "
348 << (error.is_set() ? error.message() : "");
349 return false;
352 if (bus_type_ == CUSTOM_ADDRESS) {
353 // We should call dbus_bus_register here, otherwise unique name can not be
354 // acquired. According to dbus specification, it is responsible to call
355 // org.freedesktop.DBus.Hello method at the beging of bus connection to
356 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
357 // called internally.
358 if (!dbus_bus_register(connection_, error.get())) {
359 LOG(ERROR) << "Failed to register the bus component: "
360 << (error.is_set() ? error.message() : "");
361 return false;
364 // We shouldn't exit on the disconnected signal.
365 dbus_connection_set_exit_on_disconnect(connection_, false);
367 return true;
370 void Bus::ShutdownAndBlock() {
371 AssertOnDBusThread();
373 // Unregister the exported objects.
374 for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
375 iter != exported_object_table_.end(); ++iter) {
376 iter->second->Unregister();
379 // Release all service names.
380 for (std::set<std::string>::iterator iter = owned_service_names_.begin();
381 iter != owned_service_names_.end();) {
382 // This is a bit tricky but we should increment the iter here as
383 // ReleaseOwnership() may remove |service_name| from the set.
384 const std::string& service_name = *iter++;
385 ReleaseOwnership(service_name);
387 if (!owned_service_names_.empty()) {
388 LOG(ERROR) << "Failed to release all service names. # of services left: "
389 << owned_service_names_.size();
392 // Detach from the remote objects.
393 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
394 iter != object_proxy_table_.end(); ++iter) {
395 iter->second->Detach();
398 // Release object proxies and exported objects here. We should do this
399 // here rather than in the destructor to avoid memory leaks due to
400 // cyclic references.
401 object_proxy_table_.clear();
402 exported_object_table_.clear();
404 // Private connection should be closed.
405 if (connection_) {
406 if (connection_type_ == PRIVATE)
407 dbus_connection_close(connection_);
408 // dbus_connection_close() won't unref.
409 dbus_connection_unref(connection_);
412 connection_ = NULL;
413 shutdown_completed_ = true;
416 void Bus::ShutdownOnDBusThreadAndBlock() {
417 AssertOnOriginThread();
418 DCHECK(dbus_thread_message_loop_proxy_.get());
420 PostTaskToDBusThread(FROM_HERE, base::Bind(
421 &Bus::ShutdownOnDBusThreadAndBlockInternal,
422 this));
424 // http://crbug.com/125222
425 base::ThreadRestrictions::ScopedAllowWait allow_wait;
427 // Wait until the shutdown is complete on the D-Bus thread.
428 // The shutdown should not hang, but set timeout just in case.
429 const int kTimeoutSecs = 3;
430 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
431 const bool signaled = on_shutdown_.TimedWait(timeout);
432 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
435 void Bus::RequestOwnership(const std::string& service_name,
436 OnOwnershipCallback on_ownership_callback) {
437 AssertOnOriginThread();
439 PostTaskToDBusThread(FROM_HERE, base::Bind(
440 &Bus::RequestOwnershipInternal,
441 this, service_name, on_ownership_callback));
444 void Bus::RequestOwnershipInternal(const std::string& service_name,
445 OnOwnershipCallback on_ownership_callback) {
446 AssertOnDBusThread();
448 bool success = Connect();
449 if (success)
450 success = RequestOwnershipAndBlock(service_name);
452 PostTaskToOriginThread(FROM_HERE,
453 base::Bind(on_ownership_callback,
454 service_name,
455 success));
458 bool Bus::RequestOwnershipAndBlock(const std::string& service_name) {
459 DCHECK(connection_);
460 // dbus_bus_request_name() is a blocking call.
461 AssertOnDBusThread();
463 // Check if we already own the service name.
464 if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
465 return true;
468 ScopedDBusError error;
469 const int result = dbus_bus_request_name(connection_,
470 service_name.c_str(),
471 DBUS_NAME_FLAG_DO_NOT_QUEUE,
472 error.get());
473 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
474 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
475 << (error.is_set() ? error.message() : "");
476 return false;
478 owned_service_names_.insert(service_name);
479 return true;
482 bool Bus::ReleaseOwnership(const std::string& service_name) {
483 DCHECK(connection_);
484 // dbus_bus_request_name() is a blocking call.
485 AssertOnDBusThread();
487 // Check if we already own the service name.
488 std::set<std::string>::iterator found =
489 owned_service_names_.find(service_name);
490 if (found == owned_service_names_.end()) {
491 LOG(ERROR) << service_name << " is not owned by the bus";
492 return false;
495 ScopedDBusError error;
496 const int result = dbus_bus_release_name(connection_, service_name.c_str(),
497 error.get());
498 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
499 owned_service_names_.erase(found);
500 return true;
501 } else {
502 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
503 << (error.is_set() ? error.message() : "");
504 return false;
508 bool Bus::SetUpAsyncOperations() {
509 DCHECK(connection_);
510 AssertOnDBusThread();
512 if (async_operations_set_up_)
513 return true;
515 // Process all the incoming data if any, so that OnDispatchStatus() will
516 // be called when the incoming data is ready.
517 ProcessAllIncomingDataIfAny();
519 bool success = dbus_connection_set_watch_functions(connection_,
520 &Bus::OnAddWatchThunk,
521 &Bus::OnRemoveWatchThunk,
522 &Bus::OnToggleWatchThunk,
523 this,
524 NULL);
525 CHECK(success) << "Unable to allocate memory";
527 success = dbus_connection_set_timeout_functions(connection_,
528 &Bus::OnAddTimeoutThunk,
529 &Bus::OnRemoveTimeoutThunk,
530 &Bus::OnToggleTimeoutThunk,
531 this,
532 NULL);
533 CHECK(success) << "Unable to allocate memory";
535 dbus_connection_set_dispatch_status_function(
536 connection_,
537 &Bus::OnDispatchStatusChangedThunk,
538 this,
539 NULL);
541 async_operations_set_up_ = true;
543 return true;
546 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
547 int timeout_ms,
548 DBusError* error) {
549 DCHECK(connection_);
550 AssertOnDBusThread();
552 return dbus_connection_send_with_reply_and_block(
553 connection_, request, timeout_ms, error);
556 void Bus::SendWithReply(DBusMessage* request,
557 DBusPendingCall** pending_call,
558 int timeout_ms) {
559 DCHECK(connection_);
560 AssertOnDBusThread();
562 const bool success = dbus_connection_send_with_reply(
563 connection_, request, pending_call, timeout_ms);
564 CHECK(success) << "Unable to allocate memory";
567 void Bus::Send(DBusMessage* request, uint32* serial) {
568 DCHECK(connection_);
569 AssertOnDBusThread();
571 const bool success = dbus_connection_send(connection_, request, serial);
572 CHECK(success) << "Unable to allocate memory";
575 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
576 void* user_data) {
577 DCHECK(connection_);
578 AssertOnDBusThread();
580 std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
581 std::make_pair(filter_function, user_data);
582 if (filter_functions_added_.find(filter_data_pair) !=
583 filter_functions_added_.end()) {
584 VLOG(1) << "Filter function already exists: " << filter_function
585 << " with associated data: " << user_data;
586 return false;
589 const bool success = dbus_connection_add_filter(
590 connection_, filter_function, user_data, NULL);
591 CHECK(success) << "Unable to allocate memory";
592 filter_functions_added_.insert(filter_data_pair);
593 return true;
596 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
597 void* user_data) {
598 DCHECK(connection_);
599 AssertOnDBusThread();
601 std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
602 std::make_pair(filter_function, user_data);
603 if (filter_functions_added_.find(filter_data_pair) ==
604 filter_functions_added_.end()) {
605 VLOG(1) << "Requested to remove an unknown filter function: "
606 << filter_function
607 << " with associated data: " << user_data;
608 return false;
611 dbus_connection_remove_filter(connection_, filter_function, user_data);
612 filter_functions_added_.erase(filter_data_pair);
613 return true;
616 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
617 DCHECK(connection_);
618 AssertOnDBusThread();
620 std::map<std::string, int>::iterator iter =
621 match_rules_added_.find(match_rule);
622 if (iter != match_rules_added_.end()) {
623 // The already existing rule's counter is incremented.
624 iter->second++;
626 VLOG(1) << "Match rule already exists: " << match_rule;
627 return;
630 dbus_bus_add_match(connection_, match_rule.c_str(), error);
631 match_rules_added_[match_rule] = 1;
634 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
635 DCHECK(connection_);
636 AssertOnDBusThread();
638 std::map<std::string, int>::iterator iter =
639 match_rules_added_.find(match_rule);
640 if (iter == match_rules_added_.end()) {
641 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
642 return false;
645 // The rule's counter is decremented and the rule is deleted when reachs 0.
646 iter->second--;
647 if (iter->second == 0) {
648 dbus_bus_remove_match(connection_, match_rule.c_str(), error);
649 match_rules_added_.erase(match_rule);
651 return true;
654 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
655 const DBusObjectPathVTable* vtable,
656 void* user_data,
657 DBusError* error) {
658 DCHECK(connection_);
659 AssertOnDBusThread();
661 if (registered_object_paths_.find(object_path) !=
662 registered_object_paths_.end()) {
663 LOG(ERROR) << "Object path already registered: " << object_path.value();
664 return false;
667 const bool success = dbus_connection_try_register_object_path(
668 connection_,
669 object_path.value().c_str(),
670 vtable,
671 user_data,
672 error);
673 if (success)
674 registered_object_paths_.insert(object_path);
675 return success;
678 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
679 DCHECK(connection_);
680 AssertOnDBusThread();
682 if (registered_object_paths_.find(object_path) ==
683 registered_object_paths_.end()) {
684 LOG(ERROR) << "Requested to unregister an unknown object path: "
685 << object_path.value();
686 return;
689 const bool success = dbus_connection_unregister_object_path(
690 connection_,
691 object_path.value().c_str());
692 CHECK(success) << "Unable to allocate memory";
693 registered_object_paths_.erase(object_path);
696 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
697 AssertOnDBusThread();
699 ShutdownAndBlock();
700 on_shutdown_.Signal();
703 void Bus::ProcessAllIncomingDataIfAny() {
704 AssertOnDBusThread();
706 // As mentioned at the class comment in .h file, connection_ can be NULL.
707 if (!connection_ || !dbus_connection_get_is_connected(connection_))
708 return;
710 if (dbus_connection_get_dispatch_status(connection_) ==
711 DBUS_DISPATCH_DATA_REMAINS) {
712 while (dbus_connection_dispatch(connection_) ==
713 DBUS_DISPATCH_DATA_REMAINS);
717 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
718 const base::Closure& task) {
719 DCHECK(origin_message_loop_proxy_.get());
720 if (!origin_message_loop_proxy_->PostTask(from_here, task)) {
721 LOG(WARNING) << "Failed to post a task to the origin message loop";
725 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
726 const base::Closure& task) {
727 if (dbus_thread_message_loop_proxy_.get()) {
728 if (!dbus_thread_message_loop_proxy_->PostTask(from_here, task)) {
729 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
731 } else {
732 DCHECK(origin_message_loop_proxy_.get());
733 if (!origin_message_loop_proxy_->PostTask(from_here, task)) {
734 LOG(WARNING) << "Failed to post a task to the origin message loop";
739 void Bus::PostDelayedTaskToDBusThread(
740 const tracked_objects::Location& from_here,
741 const base::Closure& task,
742 base::TimeDelta delay) {
743 if (dbus_thread_message_loop_proxy_.get()) {
744 if (!dbus_thread_message_loop_proxy_->PostDelayedTask(
745 from_here, task, delay)) {
746 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
748 } else {
749 DCHECK(origin_message_loop_proxy_.get());
750 if (!origin_message_loop_proxy_->PostDelayedTask(
751 from_here, task, delay)) {
752 LOG(WARNING) << "Failed to post a task to the origin message loop";
757 bool Bus::HasDBusThread() {
758 return dbus_thread_message_loop_proxy_.get() != NULL;
761 void Bus::AssertOnOriginThread() {
762 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
765 void Bus::AssertOnDBusThread() {
766 base::ThreadRestrictions::AssertIOAllowed();
768 if (dbus_thread_message_loop_proxy_.get()) {
769 DCHECK(dbus_thread_message_loop_proxy_->BelongsToCurrentThread());
770 } else {
771 AssertOnOriginThread();
775 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
776 AssertOnDBusThread();
778 // watch will be deleted when raw_watch is removed in OnRemoveWatch().
779 Watch* watch = new Watch(raw_watch);
780 if (watch->IsReadyToBeWatched()) {
781 watch->StartWatching();
783 ++num_pending_watches_;
784 return true;
787 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
788 AssertOnDBusThread();
790 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
791 delete watch;
792 --num_pending_watches_;
795 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
796 AssertOnDBusThread();
798 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
799 if (watch->IsReadyToBeWatched()) {
800 watch->StartWatching();
801 } else {
802 // It's safe to call this if StartWatching() wasn't called, per
803 // message_pump_libevent.h.
804 watch->StopWatching();
808 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
809 AssertOnDBusThread();
811 // timeout will be deleted when raw_timeout is removed in
812 // OnRemoveTimeoutThunk().
813 Timeout* timeout = new Timeout(raw_timeout);
814 if (timeout->IsReadyToBeMonitored()) {
815 timeout->StartMonitoring(this);
817 ++num_pending_timeouts_;
818 return true;
821 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
822 AssertOnDBusThread();
824 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
825 timeout->Complete();
826 --num_pending_timeouts_;
829 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
830 AssertOnDBusThread();
832 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
833 if (timeout->IsReadyToBeMonitored()) {
834 timeout->StartMonitoring(this);
835 } else {
836 timeout->StopMonitoring();
840 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
841 DBusDispatchStatus status) {
842 DCHECK_EQ(connection, connection_);
843 AssertOnDBusThread();
845 if (!dbus_connection_get_is_connected(connection))
846 return;
848 // We cannot call ProcessAllIncomingDataIfAny() here, as calling
849 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
850 // prohibited by the D-Bus library. Hence, we post a task here instead.
851 // See comments for dbus_connection_set_dispatch_status_function().
852 PostTaskToDBusThread(FROM_HERE,
853 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
854 this));
857 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
858 Bus* self = static_cast<Bus*>(data);
859 return self->OnAddWatch(raw_watch);
862 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
863 Bus* self = static_cast<Bus*>(data);
864 self->OnRemoveWatch(raw_watch);
867 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
868 Bus* self = static_cast<Bus*>(data);
869 self->OnToggleWatch(raw_watch);
872 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
873 Bus* self = static_cast<Bus*>(data);
874 return self->OnAddTimeout(raw_timeout);
877 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
878 Bus* self = static_cast<Bus*>(data);
879 self->OnRemoveTimeout(raw_timeout);
882 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
883 Bus* self = static_cast<Bus*>(data);
884 self->OnToggleTimeout(raw_timeout);
887 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
888 DBusDispatchStatus status,
889 void* data) {
890 Bus* self = static_cast<Bus*>(data);
891 self->OnDispatchStatusChanged(connection, status);
894 } // namespace dbus