3 # Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
4 # Copyright (C) 2005-2007 Collabora Ltd. <http://www.collabora.co.uk/>
6 # Permission is hereby granted, free of charge, to any person
7 # obtaining a copy of this software and associated documentation
8 # files (the "Software"), to deal in the Software without
9 # restriction, including without limitation the rights to use, copy,
10 # modify, merge, publish, distribute, sublicense, and/or sell copies
11 # of the Software, and to permit persons to whom the Software is
12 # furnished to do so, subject to the following conditions:
14 # The above copyright notice and this permission notice shall be
15 # included in all copies or substantial portions of the Software.
17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
21 # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
22 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24 # DEALINGS IN THE SOFTWARE.
32 builddir
= os
.path
.normpath(os
.environ
["DBUS_TOP_BUILDDIR"])
33 pydir
= os
.path
.normpath(os
.environ
["DBUS_TOP_SRCDIR"])
43 logging
.getLogger().setLevel(1)
44 logger
= logging
.getLogger('test-signals')
48 if not pkg
.startswith(pydir
):
49 raise Exception("DBus modules (%s) are not being picked up from the package"%pkg
)
51 if not _dbus_bindings
.__file
__.startswith(builddir
):
52 raise Exception("DBus modules (%s) are not being picked up from the package"%_dbus_bindings
.__file
__)
55 NAME
= "org.freedesktop.DBus.TestSuitePythonService"
56 IFACE
= "org.freedesktop.DBus.TestSuiteInterface"
57 OBJECT
= "/org/freedesktop/DBus/TestSuitePythonObject"
60 class TestSignals(unittest
.TestCase
):
62 logger
.info('setUp()')
63 self
.bus
= dbus
.SessionBus()
64 self
.remote_object
= self
.bus
.get_object(NAME
, OBJECT
)
65 self
.remote_object_fallback_trivial
= self
.bus
.get_object(NAME
,
67 self
.remote_object_fallback
= self
.bus
.get_object(NAME
,
68 OBJECT
+ '/Fallback/Badger')
69 self
.remote_object_follow
= self
.bus
.get_object(NAME
, OBJECT
,
70 follow_name_owner_changes
=True)
71 self
.iface
= dbus
.Interface(self
.remote_object
, IFACE
)
72 self
.iface_follow
= dbus
.Interface(self
.remote_object_follow
, IFACE
)
73 self
.fallback_iface
= dbus
.Interface(self
.remote_object_fallback
, IFACE
)
74 self
.fallback_trivial_iface
= dbus
.Interface(
75 self
.remote_object_fallback_trivial
, IFACE
)
78 def signal_test_impl(self
, iface
, name
, test_removal
=False):
80 # using append rather than assignment here to avoid scoping issues
83 def _timeout_handler():
84 logger
.debug('_timeout_handler for %s: current state %s', name
, self
.in_test
)
85 if self
.in_test
== name
:
87 def _signal_handler(s
, sender
, path
):
88 logger
.debug('_signal_handler for %s: current state %s', name
, self
.in_test
)
89 if self
.in_test
not in (name
, name
+ '+removed'):
91 logger
.info('Received signal from %s:%s, argument is %r',
93 result
.append('received')
95 def _rm_timeout_handler():
96 logger
.debug('_timeout_handler for %s: current state %s', name
, self
.in_test
)
97 if self
.in_test
== name
+ '+removed':
100 logger
.info('Testing %s', name
)
101 match
= iface
.connect_to_signal('SignalOneString', _signal_handler
,
102 sender_keyword
='sender',
104 logger
.info('Waiting for signal...')
105 iface
.EmitSignal('SignalOneString', 0)
106 source_id
= gobject
.timeout_add(1000, _timeout_handler
)
109 raise AssertionError('Signal did not arrive within 1 second')
110 logger
.debug('Removing match')
112 gobject
.source_remove(source_id
)
114 self
.in_test
= name
+ '+removed'
115 logger
.info('Testing %s', name
)
117 iface
.EmitSignal('SignalOneString', 0)
118 source_id
= gobject
.timeout_add(1000, _rm_timeout_handler
)
121 raise AssertionError('Signal should not have arrived, but did')
122 gobject
.source_remove(source_id
)
124 def testFallback(self
):
125 self
.signal_test_impl(self
.fallback_iface
, 'Fallback')
127 def testFallbackTrivial(self
):
128 self
.signal_test_impl(self
.fallback_trivial_iface
, 'FallbackTrivial')
130 def testSignal(self
):
131 self
.signal_test_impl(self
.iface
, 'Signal')
133 def testRemoval(self
):
134 self
.signal_test_impl(self
.iface
, 'Removal', True)
136 def testSignalAgain(self
):
137 self
.signal_test_impl(self
.iface
, 'SignalAgain')
139 def testRemovalAgain(self
):
140 self
.signal_test_impl(self
.iface
, 'RemovalAgain', True)
142 def testSignalF(self
):
143 self
.signal_test_impl(self
.iface_follow
, 'Signal')
145 def testRemovalF(self
):
146 self
.signal_test_impl(self
.iface_follow
, 'Removal', True)
148 def testSignalAgainF(self
):
149 self
.signal_test_impl(self
.iface_follow
, 'SignalAgain')
151 def testRemovalAgainF(self
):
152 self
.signal_test_impl(self
.iface_follow
, 'RemovalAgain', True)
154 if __name__
== '__main__':
155 main_loop
= gobject
.MainLoop()
156 gobject
.threads_init()
157 dbus
.glib
.init_threads()
159 logger
.info('Starting unit test')