Revert 198312 "Effects Pepper Plugin and MediaStream Glue."
[chromium-blink-merge.git] / dbus / bus.cc
blobea0497d45cf8b053102e5b99fa7bfba0283d4c36
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "dbus/bus.h"
7 #include "base/bind.h"
8 #include "base/logging.h"
9 #include "base/message_loop.h"
10 #include "base/message_loop_proxy.h"
11 #include "base/stl_util.h"
12 #include "base/threading/thread.h"
13 #include "base/threading/thread_restrictions.h"
14 #include "base/time.h"
15 #include "dbus/exported_object.h"
16 #include "dbus/object_manager.h"
17 #include "dbus/object_path.h"
18 #include "dbus/object_proxy.h"
19 #include "dbus/scoped_dbus_error.h"
21 namespace dbus {
23 namespace {
25 const char kDisconnectedSignal[] = "Disconnected";
26 const char kDisconnectedMatchRule[] =
27 "type='signal', path='/org/freedesktop/DBus/Local',"
28 "interface='org.freedesktop.DBus.Local', member='Disconnected'";
30 // The class is used for watching the file descriptor used for D-Bus
31 // communication.
32 class Watch : public base::MessagePumpLibevent::Watcher {
33 public:
34 explicit Watch(DBusWatch* watch)
35 : raw_watch_(watch) {
36 dbus_watch_set_data(raw_watch_, this, NULL);
39 virtual ~Watch() {
40 dbus_watch_set_data(raw_watch_, NULL, NULL);
43 // Returns true if the underlying file descriptor is ready to be watched.
44 bool IsReadyToBeWatched() {
45 return dbus_watch_get_enabled(raw_watch_);
48 // Starts watching the underlying file descriptor.
49 void StartWatching() {
50 const int file_descriptor = dbus_watch_get_unix_fd(raw_watch_);
51 const int flags = dbus_watch_get_flags(raw_watch_);
53 base::MessageLoopForIO::Mode mode = base::MessageLoopForIO::WATCH_READ;
54 if ((flags & DBUS_WATCH_READABLE) && (flags & DBUS_WATCH_WRITABLE))
55 mode = base::MessageLoopForIO::WATCH_READ_WRITE;
56 else if (flags & DBUS_WATCH_READABLE)
57 mode = base::MessageLoopForIO::WATCH_READ;
58 else if (flags & DBUS_WATCH_WRITABLE)
59 mode = base::MessageLoopForIO::WATCH_WRITE;
60 else
61 NOTREACHED();
63 const bool persistent = true; // Watch persistently.
64 const bool success = base::MessageLoopForIO::current()->WatchFileDescriptor(
65 file_descriptor, persistent, mode, &file_descriptor_watcher_, this);
66 CHECK(success) << "Unable to allocate memory";
69 // Stops watching the underlying file descriptor.
70 void StopWatching() {
71 file_descriptor_watcher_.StopWatchingFileDescriptor();
74 private:
75 // Implement MessagePumpLibevent::Watcher.
76 virtual void OnFileCanReadWithoutBlocking(int file_descriptor) OVERRIDE {
77 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_READABLE);
78 CHECK(success) << "Unable to allocate memory";
81 // Implement MessagePumpLibevent::Watcher.
82 virtual void OnFileCanWriteWithoutBlocking(int file_descriptor) OVERRIDE {
83 const bool success = dbus_watch_handle(raw_watch_, DBUS_WATCH_WRITABLE);
84 CHECK(success) << "Unable to allocate memory";
87 DBusWatch* raw_watch_;
88 base::MessagePumpLibevent::FileDescriptorWatcher file_descriptor_watcher_;
91 // The class is used for monitoring the timeout used for D-Bus method
92 // calls.
94 // Unlike Watch, Timeout is a ref counted object, to ensure that |this| of
95 // the object is is alive when HandleTimeout() is called. It's unlikely
96 // but it may be possible that HandleTimeout() is called after
97 // Bus::OnRemoveTimeout(). That's why we don't simply delete the object in
98 // Bus::OnRemoveTimeout().
99 class Timeout : public base::RefCountedThreadSafe<Timeout> {
100 public:
101 explicit Timeout(DBusTimeout* timeout)
102 : raw_timeout_(timeout),
103 monitoring_is_active_(false),
104 is_completed(false) {
105 dbus_timeout_set_data(raw_timeout_, this, NULL);
106 AddRef(); // Balanced on Complete().
109 // Returns true if the timeout is ready to be monitored.
110 bool IsReadyToBeMonitored() {
111 return dbus_timeout_get_enabled(raw_timeout_);
114 // Starts monitoring the timeout.
115 void StartMonitoring(dbus::Bus* bus) {
116 bus->PostDelayedTaskToDBusThread(FROM_HERE,
117 base::Bind(&Timeout::HandleTimeout,
118 this),
119 GetInterval());
120 monitoring_is_active_ = true;
123 // Stops monitoring the timeout.
124 void StopMonitoring() {
125 // We cannot take back the delayed task we posted in
126 // StartMonitoring(), so we just mark the monitoring is inactive now.
127 monitoring_is_active_ = false;
130 // Returns the interval.
131 base::TimeDelta GetInterval() {
132 return base::TimeDelta::FromMilliseconds(
133 dbus_timeout_get_interval(raw_timeout_));
136 // Cleans up the raw_timeout and marks that timeout is completed.
137 // See the class comment above for why we are doing this.
138 void Complete() {
139 dbus_timeout_set_data(raw_timeout_, NULL, NULL);
140 is_completed = true;
141 Release();
144 private:
145 friend class base::RefCountedThreadSafe<Timeout>;
146 ~Timeout() {
149 // Handles the timeout.
150 void HandleTimeout() {
151 // If the timeout is marked completed, we should do nothing. This can
152 // occur if this function is called after Bus::OnRemoveTimeout().
153 if (is_completed)
154 return;
155 // Skip if monitoring is canceled.
156 if (!monitoring_is_active_)
157 return;
159 const bool success = dbus_timeout_handle(raw_timeout_);
160 CHECK(success) << "Unable to allocate memory";
163 DBusTimeout* raw_timeout_;
164 bool monitoring_is_active_;
165 bool is_completed;
168 } // namespace
170 Bus::Options::Options()
171 : bus_type(SESSION),
172 connection_type(PRIVATE) {
175 Bus::Options::~Options() {
178 Bus::Bus(const Options& options)
179 : bus_type_(options.bus_type),
180 connection_type_(options.connection_type),
181 dbus_task_runner_(options.dbus_task_runner),
182 on_shutdown_(false /* manual_reset */, false /* initially_signaled */),
183 connection_(NULL),
184 origin_thread_id_(base::PlatformThread::CurrentId()),
185 async_operations_set_up_(false),
186 shutdown_completed_(false),
187 num_pending_watches_(0),
188 num_pending_timeouts_(0),
189 address_(options.address),
190 on_disconnected_closure_(options.disconnected_callback) {
191 // This is safe to call multiple times.
192 dbus_threads_init_default();
193 // The origin message loop is unnecessary if the client uses synchronous
194 // functions only.
195 if (base::MessageLoop::current())
196 origin_task_runner_ = base::MessageLoop::current()->message_loop_proxy();
199 Bus::~Bus() {
200 DCHECK(!connection_);
201 DCHECK(owned_service_names_.empty());
202 DCHECK(match_rules_added_.empty());
203 DCHECK(filter_functions_added_.empty());
204 DCHECK(registered_object_paths_.empty());
205 DCHECK_EQ(0, num_pending_watches_);
206 // TODO(satorux): This check fails occasionally in browser_tests for tests
207 // that run very quickly. Perhaps something does not have time to clean up.
208 // Despite the check failing, the tests seem to run fine. crosbug.com/23416
209 // DCHECK_EQ(0, num_pending_timeouts_);
212 ObjectProxy* Bus::GetObjectProxy(const std::string& service_name,
213 const ObjectPath& object_path) {
214 return GetObjectProxyWithOptions(service_name, object_path,
215 ObjectProxy::DEFAULT_OPTIONS);
218 ObjectProxy* Bus::GetObjectProxyWithOptions(const std::string& service_name,
219 const dbus::ObjectPath& object_path,
220 int options) {
221 AssertOnOriginThread();
223 // Check if we already have the requested object proxy.
224 const ObjectProxyTable::key_type key(service_name + object_path.value(),
225 options);
226 ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
227 if (iter != object_proxy_table_.end()) {
228 return iter->second;
231 scoped_refptr<ObjectProxy> object_proxy =
232 new ObjectProxy(this, service_name, object_path, options);
233 object_proxy_table_[key] = object_proxy;
235 return object_proxy.get();
238 bool Bus::RemoveObjectProxy(const std::string& service_name,
239 const ObjectPath& object_path,
240 const base::Closure& callback) {
241 return RemoveObjectProxyWithOptions(service_name, object_path,
242 ObjectProxy::DEFAULT_OPTIONS,
243 callback);
246 bool Bus::RemoveObjectProxyWithOptions(const std::string& service_name,
247 const dbus::ObjectPath& object_path,
248 int options,
249 const base::Closure& callback) {
250 AssertOnOriginThread();
252 // Check if we have the requested object proxy.
253 const ObjectProxyTable::key_type key(service_name + object_path.value(),
254 options);
255 ObjectProxyTable::iterator iter = object_proxy_table_.find(key);
256 if (iter != object_proxy_table_.end()) {
257 // Object is present. Remove it now and Detach in the DBus thread.
258 PostTaskToDBusThread(FROM_HERE, base::Bind(
259 &Bus::RemoveObjectProxyInternal,
260 this, iter->second, callback));
262 object_proxy_table_.erase(iter);
263 return true;
265 return false;
268 void Bus::RemoveObjectProxyInternal(
269 scoped_refptr<dbus::ObjectProxy> object_proxy,
270 const base::Closure& callback) {
271 AssertOnDBusThread();
273 object_proxy.get()->Detach();
275 PostTaskToOriginThread(FROM_HERE, callback);
278 ExportedObject* Bus::GetExportedObject(const ObjectPath& object_path) {
279 AssertOnOriginThread();
281 // Check if we already have the requested exported object.
282 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
283 if (iter != exported_object_table_.end()) {
284 return iter->second;
287 scoped_refptr<ExportedObject> exported_object =
288 new ExportedObject(this, object_path);
289 exported_object_table_[object_path] = exported_object;
291 return exported_object.get();
294 void Bus::UnregisterExportedObject(const ObjectPath& object_path) {
295 AssertOnOriginThread();
297 // Remove the registered object from the table first, to allow a new
298 // GetExportedObject() call to return a new object, rather than this one.
299 ExportedObjectTable::iterator iter = exported_object_table_.find(object_path);
300 if (iter == exported_object_table_.end())
301 return;
303 scoped_refptr<ExportedObject> exported_object = iter->second;
304 exported_object_table_.erase(iter);
306 // Post the task to perform the final unregistration to the D-Bus thread.
307 // Since the registration also happens on the D-Bus thread in
308 // TryRegisterObjectPath(), and the task runner we post to is a
309 // SequencedTaskRunner, there is a guarantee that this will happen before any
310 // future registration call.
311 PostTaskToDBusThread(FROM_HERE,
312 base::Bind(&Bus::UnregisterExportedObjectInternal,
313 this, exported_object));
316 void Bus::UnregisterExportedObjectInternal(
317 scoped_refptr<dbus::ExportedObject> exported_object) {
318 AssertOnDBusThread();
320 exported_object->Unregister();
323 ObjectManager* Bus::GetObjectManager(const std::string& service_name,
324 const ObjectPath& object_path) {
325 AssertOnOriginThread();
327 // Check if we already have the requested object manager.
328 const ObjectManagerTable::key_type key(service_name + object_path.value());
329 ObjectManagerTable::iterator iter = object_manager_table_.find(key);
330 if (iter != object_manager_table_.end()) {
331 return iter->second;
334 scoped_refptr<ObjectManager> object_manager =
335 new ObjectManager(this, service_name, object_path);
336 object_manager_table_[key] = object_manager;
338 return object_manager.get();
341 void Bus::RemoveObjectManager(const std::string& service_name,
342 const ObjectPath& object_path) {
343 AssertOnOriginThread();
345 const ObjectManagerTable::key_type key(service_name + object_path.value());
346 ObjectManagerTable::iterator iter = object_manager_table_.find(key);
347 if (iter == object_manager_table_.end())
348 return;
350 scoped_refptr<ObjectManager> object_manager = iter->second;
351 object_manager_table_.erase(iter);
354 void Bus::GetManagedObjects() {
355 for (ObjectManagerTable::iterator iter = object_manager_table_.begin();
356 iter != object_manager_table_.end(); ++iter) {
357 iter->second->GetManagedObjects();
361 bool Bus::Connect() {
362 // dbus_bus_get_private() and dbus_bus_get() are blocking calls.
363 AssertOnDBusThread();
365 // Check if it's already initialized.
366 if (connection_)
367 return true;
369 ScopedDBusError error;
370 if (bus_type_ == CUSTOM_ADDRESS) {
371 if (connection_type_ == PRIVATE) {
372 connection_ = dbus_connection_open_private(address_.c_str(), error.get());
373 } else {
374 connection_ = dbus_connection_open(address_.c_str(), error.get());
376 } else {
377 const DBusBusType dbus_bus_type = static_cast<DBusBusType>(bus_type_);
378 if (connection_type_ == PRIVATE) {
379 connection_ = dbus_bus_get_private(dbus_bus_type, error.get());
380 } else {
381 connection_ = dbus_bus_get(dbus_bus_type, error.get());
384 if (!connection_) {
385 LOG(ERROR) << "Failed to connect to the bus: "
386 << (error.is_set() ? error.message() : "");
387 return false;
390 if (bus_type_ == CUSTOM_ADDRESS) {
391 // We should call dbus_bus_register here, otherwise unique name can not be
392 // acquired. According to dbus specification, it is responsible to call
393 // org.freedesktop.DBus.Hello method at the beging of bus connection to
394 // acquire unique name. In the case of dbus_bus_get, dbus_bus_register is
395 // called internally.
396 if (!dbus_bus_register(connection_, error.get())) {
397 LOG(ERROR) << "Failed to register the bus component: "
398 << (error.is_set() ? error.message() : "");
399 return false;
402 // We shouldn't exit on the disconnected signal.
403 dbus_connection_set_exit_on_disconnect(connection_, false);
405 // Watch Disconnected signal.
406 AddFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
407 AddMatch(kDisconnectedMatchRule, error.get());
409 return true;
412 void Bus::ClosePrivateConnection() {
413 // dbus_connection_close is blocking call.
414 AssertOnDBusThread();
415 DCHECK_EQ(PRIVATE, connection_type_)
416 << "non-private connection should not be closed";
417 dbus_connection_close(connection_);
420 void Bus::ShutdownAndBlock() {
421 AssertOnDBusThread();
423 if (shutdown_completed_)
424 return; // Already shutdowned, just return.
426 // Unregister the exported objects.
427 for (ExportedObjectTable::iterator iter = exported_object_table_.begin();
428 iter != exported_object_table_.end(); ++iter) {
429 iter->second->Unregister();
432 // Release all service names.
433 for (std::set<std::string>::iterator iter = owned_service_names_.begin();
434 iter != owned_service_names_.end();) {
435 // This is a bit tricky but we should increment the iter here as
436 // ReleaseOwnership() may remove |service_name| from the set.
437 const std::string& service_name = *iter++;
438 ReleaseOwnership(service_name);
440 if (!owned_service_names_.empty()) {
441 LOG(ERROR) << "Failed to release all service names. # of services left: "
442 << owned_service_names_.size();
445 // Detach from the remote objects.
446 for (ObjectProxyTable::iterator iter = object_proxy_table_.begin();
447 iter != object_proxy_table_.end(); ++iter) {
448 iter->second->Detach();
451 // Release object proxies and exported objects here. We should do this
452 // here rather than in the destructor to avoid memory leaks due to
453 // cyclic references.
454 object_proxy_table_.clear();
455 exported_object_table_.clear();
457 // Private connection should be closed.
458 if (connection_) {
459 // Remove Disconnected watcher.
460 ScopedDBusError error;
461 RemoveFilterFunction(Bus::OnConnectionDisconnectedFilter, this);
462 RemoveMatch(kDisconnectedMatchRule, error.get());
464 if (connection_type_ == PRIVATE)
465 ClosePrivateConnection();
466 // dbus_connection_close() won't unref.
467 dbus_connection_unref(connection_);
470 connection_ = NULL;
471 shutdown_completed_ = true;
474 void Bus::ShutdownOnDBusThreadAndBlock() {
475 AssertOnOriginThread();
476 DCHECK(dbus_task_runner_.get());
478 PostTaskToDBusThread(FROM_HERE, base::Bind(
479 &Bus::ShutdownOnDBusThreadAndBlockInternal,
480 this));
482 // http://crbug.com/125222
483 base::ThreadRestrictions::ScopedAllowWait allow_wait;
485 // Wait until the shutdown is complete on the D-Bus thread.
486 // The shutdown should not hang, but set timeout just in case.
487 const int kTimeoutSecs = 3;
488 const base::TimeDelta timeout(base::TimeDelta::FromSeconds(kTimeoutSecs));
489 const bool signaled = on_shutdown_.TimedWait(timeout);
490 LOG_IF(ERROR, !signaled) << "Failed to shutdown the bus";
493 void Bus::RequestOwnership(const std::string& service_name,
494 OnOwnershipCallback on_ownership_callback) {
495 AssertOnOriginThread();
497 PostTaskToDBusThread(FROM_HERE, base::Bind(
498 &Bus::RequestOwnershipInternal,
499 this, service_name, on_ownership_callback));
502 void Bus::RequestOwnershipInternal(const std::string& service_name,
503 OnOwnershipCallback on_ownership_callback) {
504 AssertOnDBusThread();
506 bool success = Connect();
507 if (success)
508 success = RequestOwnershipAndBlock(service_name);
510 PostTaskToOriginThread(FROM_HERE,
511 base::Bind(on_ownership_callback,
512 service_name,
513 success));
516 bool Bus::RequestOwnershipAndBlock(const std::string& service_name) {
517 DCHECK(connection_);
518 // dbus_bus_request_name() is a blocking call.
519 AssertOnDBusThread();
521 // Check if we already own the service name.
522 if (owned_service_names_.find(service_name) != owned_service_names_.end()) {
523 return true;
526 ScopedDBusError error;
527 const int result = dbus_bus_request_name(connection_,
528 service_name.c_str(),
529 DBUS_NAME_FLAG_DO_NOT_QUEUE,
530 error.get());
531 if (result != DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER) {
532 LOG(ERROR) << "Failed to get the ownership of " << service_name << ": "
533 << (error.is_set() ? error.message() : "");
534 return false;
536 owned_service_names_.insert(service_name);
537 return true;
540 bool Bus::ReleaseOwnership(const std::string& service_name) {
541 DCHECK(connection_);
542 // dbus_bus_request_name() is a blocking call.
543 AssertOnDBusThread();
545 // Check if we already own the service name.
546 std::set<std::string>::iterator found =
547 owned_service_names_.find(service_name);
548 if (found == owned_service_names_.end()) {
549 LOG(ERROR) << service_name << " is not owned by the bus";
550 return false;
553 ScopedDBusError error;
554 const int result = dbus_bus_release_name(connection_, service_name.c_str(),
555 error.get());
556 if (result == DBUS_RELEASE_NAME_REPLY_RELEASED) {
557 owned_service_names_.erase(found);
558 return true;
559 } else {
560 LOG(ERROR) << "Failed to release the ownership of " << service_name << ": "
561 << (error.is_set() ? error.message() : "");
562 return false;
566 bool Bus::SetUpAsyncOperations() {
567 DCHECK(connection_);
568 AssertOnDBusThread();
570 if (async_operations_set_up_)
571 return true;
573 // Process all the incoming data if any, so that OnDispatchStatus() will
574 // be called when the incoming data is ready.
575 ProcessAllIncomingDataIfAny();
577 bool success = dbus_connection_set_watch_functions(connection_,
578 &Bus::OnAddWatchThunk,
579 &Bus::OnRemoveWatchThunk,
580 &Bus::OnToggleWatchThunk,
581 this,
582 NULL);
583 CHECK(success) << "Unable to allocate memory";
585 success = dbus_connection_set_timeout_functions(connection_,
586 &Bus::OnAddTimeoutThunk,
587 &Bus::OnRemoveTimeoutThunk,
588 &Bus::OnToggleTimeoutThunk,
589 this,
590 NULL);
591 CHECK(success) << "Unable to allocate memory";
593 dbus_connection_set_dispatch_status_function(
594 connection_,
595 &Bus::OnDispatchStatusChangedThunk,
596 this,
597 NULL);
599 async_operations_set_up_ = true;
601 return true;
604 DBusMessage* Bus::SendWithReplyAndBlock(DBusMessage* request,
605 int timeout_ms,
606 DBusError* error) {
607 DCHECK(connection_);
608 AssertOnDBusThread();
610 return dbus_connection_send_with_reply_and_block(
611 connection_, request, timeout_ms, error);
614 void Bus::SendWithReply(DBusMessage* request,
615 DBusPendingCall** pending_call,
616 int timeout_ms) {
617 DCHECK(connection_);
618 AssertOnDBusThread();
620 const bool success = dbus_connection_send_with_reply(
621 connection_, request, pending_call, timeout_ms);
622 CHECK(success) << "Unable to allocate memory";
625 void Bus::Send(DBusMessage* request, uint32* serial) {
626 DCHECK(connection_);
627 AssertOnDBusThread();
629 const bool success = dbus_connection_send(connection_, request, serial);
630 CHECK(success) << "Unable to allocate memory";
633 bool Bus::AddFilterFunction(DBusHandleMessageFunction filter_function,
634 void* user_data) {
635 DCHECK(connection_);
636 AssertOnDBusThread();
638 std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
639 std::make_pair(filter_function, user_data);
640 if (filter_functions_added_.find(filter_data_pair) !=
641 filter_functions_added_.end()) {
642 VLOG(1) << "Filter function already exists: " << filter_function
643 << " with associated data: " << user_data;
644 return false;
647 const bool success = dbus_connection_add_filter(
648 connection_, filter_function, user_data, NULL);
649 CHECK(success) << "Unable to allocate memory";
650 filter_functions_added_.insert(filter_data_pair);
651 return true;
654 bool Bus::RemoveFilterFunction(DBusHandleMessageFunction filter_function,
655 void* user_data) {
656 DCHECK(connection_);
657 AssertOnDBusThread();
659 std::pair<DBusHandleMessageFunction, void*> filter_data_pair =
660 std::make_pair(filter_function, user_data);
661 if (filter_functions_added_.find(filter_data_pair) ==
662 filter_functions_added_.end()) {
663 VLOG(1) << "Requested to remove an unknown filter function: "
664 << filter_function
665 << " with associated data: " << user_data;
666 return false;
669 dbus_connection_remove_filter(connection_, filter_function, user_data);
670 filter_functions_added_.erase(filter_data_pair);
671 return true;
674 void Bus::AddMatch(const std::string& match_rule, DBusError* error) {
675 DCHECK(connection_);
676 AssertOnDBusThread();
678 std::map<std::string, int>::iterator iter =
679 match_rules_added_.find(match_rule);
680 if (iter != match_rules_added_.end()) {
681 // The already existing rule's counter is incremented.
682 iter->second++;
684 VLOG(1) << "Match rule already exists: " << match_rule;
685 return;
688 dbus_bus_add_match(connection_, match_rule.c_str(), error);
689 match_rules_added_[match_rule] = 1;
692 bool Bus::RemoveMatch(const std::string& match_rule, DBusError* error) {
693 DCHECK(connection_);
694 AssertOnDBusThread();
696 std::map<std::string, int>::iterator iter =
697 match_rules_added_.find(match_rule);
698 if (iter == match_rules_added_.end()) {
699 LOG(ERROR) << "Requested to remove an unknown match rule: " << match_rule;
700 return false;
703 // The rule's counter is decremented and the rule is deleted when reachs 0.
704 iter->second--;
705 if (iter->second == 0) {
706 dbus_bus_remove_match(connection_, match_rule.c_str(), error);
707 match_rules_added_.erase(match_rule);
709 return true;
712 bool Bus::TryRegisterObjectPath(const ObjectPath& object_path,
713 const DBusObjectPathVTable* vtable,
714 void* user_data,
715 DBusError* error) {
716 DCHECK(connection_);
717 AssertOnDBusThread();
719 if (registered_object_paths_.find(object_path) !=
720 registered_object_paths_.end()) {
721 LOG(ERROR) << "Object path already registered: " << object_path.value();
722 return false;
725 const bool success = dbus_connection_try_register_object_path(
726 connection_,
727 object_path.value().c_str(),
728 vtable,
729 user_data,
730 error);
731 if (success)
732 registered_object_paths_.insert(object_path);
733 return success;
736 void Bus::UnregisterObjectPath(const ObjectPath& object_path) {
737 DCHECK(connection_);
738 AssertOnDBusThread();
740 if (registered_object_paths_.find(object_path) ==
741 registered_object_paths_.end()) {
742 LOG(ERROR) << "Requested to unregister an unknown object path: "
743 << object_path.value();
744 return;
747 const bool success = dbus_connection_unregister_object_path(
748 connection_,
749 object_path.value().c_str());
750 CHECK(success) << "Unable to allocate memory";
751 registered_object_paths_.erase(object_path);
754 void Bus::ShutdownOnDBusThreadAndBlockInternal() {
755 AssertOnDBusThread();
757 ShutdownAndBlock();
758 on_shutdown_.Signal();
761 void Bus::ProcessAllIncomingDataIfAny() {
762 AssertOnDBusThread();
764 // As mentioned at the class comment in .h file, connection_ can be NULL.
765 if (!connection_)
766 return;
768 // It is safe and necessary to call dbus_connection_get_dispatch_status even
769 // if the connection is lost. Otherwise we will miss "Disconnected" signal.
770 // (crbug.com/174431)
771 if (dbus_connection_get_dispatch_status(connection_) ==
772 DBUS_DISPATCH_DATA_REMAINS) {
773 while (dbus_connection_dispatch(connection_) ==
774 DBUS_DISPATCH_DATA_REMAINS);
778 void Bus::PostTaskToOriginThread(const tracked_objects::Location& from_here,
779 const base::Closure& task) {
780 DCHECK(origin_task_runner_.get());
781 if (!origin_task_runner_->PostTask(from_here, task)) {
782 LOG(WARNING) << "Failed to post a task to the origin message loop";
786 void Bus::PostTaskToDBusThread(const tracked_objects::Location& from_here,
787 const base::Closure& task) {
788 if (dbus_task_runner_.get()) {
789 if (!dbus_task_runner_->PostTask(from_here, task)) {
790 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
792 } else {
793 DCHECK(origin_task_runner_.get());
794 if (!origin_task_runner_->PostTask(from_here, task)) {
795 LOG(WARNING) << "Failed to post a task to the origin message loop";
800 void Bus::PostDelayedTaskToDBusThread(
801 const tracked_objects::Location& from_here,
802 const base::Closure& task,
803 base::TimeDelta delay) {
804 if (dbus_task_runner_.get()) {
805 if (!dbus_task_runner_->PostDelayedTask(
806 from_here, task, delay)) {
807 LOG(WARNING) << "Failed to post a task to the D-Bus thread message loop";
809 } else {
810 DCHECK(origin_task_runner_.get());
811 if (!origin_task_runner_->PostDelayedTask(from_here, task, delay)) {
812 LOG(WARNING) << "Failed to post a task to the origin message loop";
817 bool Bus::HasDBusThread() {
818 return dbus_task_runner_.get() != NULL;
821 void Bus::AssertOnOriginThread() {
822 DCHECK_EQ(origin_thread_id_, base::PlatformThread::CurrentId());
825 void Bus::AssertOnDBusThread() {
826 base::ThreadRestrictions::AssertIOAllowed();
828 if (dbus_task_runner_.get()) {
829 DCHECK(dbus_task_runner_->RunsTasksOnCurrentThread());
830 } else {
831 AssertOnOriginThread();
835 dbus_bool_t Bus::OnAddWatch(DBusWatch* raw_watch) {
836 AssertOnDBusThread();
838 // watch will be deleted when raw_watch is removed in OnRemoveWatch().
839 Watch* watch = new Watch(raw_watch);
840 if (watch->IsReadyToBeWatched()) {
841 watch->StartWatching();
843 ++num_pending_watches_;
844 return true;
847 void Bus::OnRemoveWatch(DBusWatch* raw_watch) {
848 AssertOnDBusThread();
850 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
851 delete watch;
852 --num_pending_watches_;
855 void Bus::OnToggleWatch(DBusWatch* raw_watch) {
856 AssertOnDBusThread();
858 Watch* watch = static_cast<Watch*>(dbus_watch_get_data(raw_watch));
859 if (watch->IsReadyToBeWatched()) {
860 watch->StartWatching();
861 } else {
862 // It's safe to call this if StartWatching() wasn't called, per
863 // message_pump_libevent.h.
864 watch->StopWatching();
868 dbus_bool_t Bus::OnAddTimeout(DBusTimeout* raw_timeout) {
869 AssertOnDBusThread();
871 // timeout will be deleted when raw_timeout is removed in
872 // OnRemoveTimeoutThunk().
873 Timeout* timeout = new Timeout(raw_timeout);
874 if (timeout->IsReadyToBeMonitored()) {
875 timeout->StartMonitoring(this);
877 ++num_pending_timeouts_;
878 return true;
881 void Bus::OnRemoveTimeout(DBusTimeout* raw_timeout) {
882 AssertOnDBusThread();
884 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
885 timeout->Complete();
886 --num_pending_timeouts_;
889 void Bus::OnToggleTimeout(DBusTimeout* raw_timeout) {
890 AssertOnDBusThread();
892 Timeout* timeout = static_cast<Timeout*>(dbus_timeout_get_data(raw_timeout));
893 if (timeout->IsReadyToBeMonitored()) {
894 timeout->StartMonitoring(this);
895 } else {
896 timeout->StopMonitoring();
900 void Bus::OnDispatchStatusChanged(DBusConnection* connection,
901 DBusDispatchStatus status) {
902 DCHECK_EQ(connection, connection_);
903 AssertOnDBusThread();
905 // We cannot call ProcessAllIncomingDataIfAny() here, as calling
906 // dbus_connection_dispatch() inside DBusDispatchStatusFunction is
907 // prohibited by the D-Bus library. Hence, we post a task here instead.
908 // See comments for dbus_connection_set_dispatch_status_function().
909 PostTaskToDBusThread(FROM_HERE,
910 base::Bind(&Bus::ProcessAllIncomingDataIfAny,
911 this));
914 void Bus::OnConnectionDisconnected(DBusConnection* connection) {
915 AssertOnDBusThread();
917 if (!on_disconnected_closure_.is_null())
918 PostTaskToOriginThread(FROM_HERE, on_disconnected_closure_);
920 if (!connection)
921 return;
922 DCHECK(!dbus_connection_get_is_connected(connection));
924 ShutdownAndBlock();
927 dbus_bool_t Bus::OnAddWatchThunk(DBusWatch* raw_watch, void* data) {
928 Bus* self = static_cast<Bus*>(data);
929 return self->OnAddWatch(raw_watch);
932 void Bus::OnRemoveWatchThunk(DBusWatch* raw_watch, void* data) {
933 Bus* self = static_cast<Bus*>(data);
934 self->OnRemoveWatch(raw_watch);
937 void Bus::OnToggleWatchThunk(DBusWatch* raw_watch, void* data) {
938 Bus* self = static_cast<Bus*>(data);
939 self->OnToggleWatch(raw_watch);
942 dbus_bool_t Bus::OnAddTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
943 Bus* self = static_cast<Bus*>(data);
944 return self->OnAddTimeout(raw_timeout);
947 void Bus::OnRemoveTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
948 Bus* self = static_cast<Bus*>(data);
949 self->OnRemoveTimeout(raw_timeout);
952 void Bus::OnToggleTimeoutThunk(DBusTimeout* raw_timeout, void* data) {
953 Bus* self = static_cast<Bus*>(data);
954 self->OnToggleTimeout(raw_timeout);
957 void Bus::OnDispatchStatusChangedThunk(DBusConnection* connection,
958 DBusDispatchStatus status,
959 void* data) {
960 Bus* self = static_cast<Bus*>(data);
961 self->OnDispatchStatusChanged(connection, status);
964 DBusHandlerResult Bus::OnConnectionDisconnectedFilter(
965 DBusConnection* connection,
966 DBusMessage* message,
967 void* data) {
968 if (dbus_message_is_signal(message,
969 DBUS_INTERFACE_LOCAL,
970 kDisconnectedSignal)) {
971 Bus* self = static_cast<Bus*>(data);
972 self->AssertOnDBusThread();
973 self->OnConnectionDisconnected(connection);
974 return DBUS_HANDLER_RESULT_HANDLED;
976 return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
979 } // namespace dbus