Move dbus-py-add-rst2htmlflag.m4 to jh-add-cflag.m4 - the contents of two of the...
[dbus-python-phuang.git] / test / test-signals.py
blob22b4b4befb1e6a4b8fede1ef4350ebe3add3b930
1 #!/usr/bin/env python
3 # Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
4 # Copyright (C) 2005, 2006 Collabora Ltd. <http://www.collabora.co.uk/>
6 # Licensed under the Academic Free License version 2.1
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 import sys
24 import os
25 import unittest
26 import time
27 import logging
29 builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"])
30 pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"])
32 import dbus
33 import _dbus_bindings
34 import gobject
35 import dbus.glib
36 import dbus.service
39 logging.basicConfig()
40 logging.getLogger().setLevel(1)
41 logger = logging.getLogger('test-signals')
44 pkg = dbus.__file__
45 if not pkg.startswith(pydir):
46 raise Exception("DBus modules (%s) are not being picked up from the package"%pkg)
48 if not _dbus_bindings.__file__.startswith(builddir):
49 raise Exception("DBus modules (%s) are not being picked up from the package"%_dbus_bindings.__file__)
52 NAME = "org.freedesktop.DBus.TestSuitePythonService"
53 IFACE = "org.freedesktop.DBus.TestSuiteInterface"
54 OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject"
57 class TestSignals(unittest.TestCase):
58 def setUp(self):
59 logger.info('setUp()')
60 self.bus = dbus.SessionBus()
61 self.remote_object = self.bus.get_object(NAME, OBJECT)
62 self.remote_object_fallback_trivial = self.bus.get_object(NAME,
63 OBJECT + '/Fallback')
64 self.remote_object_fallback = self.bus.get_object(NAME,
65 OBJECT + '/Fallback/Badger')
66 self.remote_object_follow = self.bus.get_object(NAME, OBJECT,
67 follow_name_owner_changes=True)
68 self.iface = dbus.Interface(self.remote_object, IFACE)
69 self.iface_follow = dbus.Interface(self.remote_object_follow, IFACE)
70 self.fallback_iface = dbus.Interface(self.remote_object_fallback, IFACE)
71 self.fallback_trivial_iface = dbus.Interface(
72 self.remote_object_fallback_trivial, IFACE)
73 self.in_test = None
75 def signal_test_impl(self, iface, name, test_removal=False):
76 self.in_test = name
77 # using append rather than assignment here to avoid scoping issues
78 result = []
80 def _timeout_handler():
81 logger.debug('_timeout_handler for %s: current state %s', name, self.in_test)
82 if self.in_test == name:
83 main_loop.quit()
84 def _signal_handler(s, sender, path):
85 logger.debug('_signal_handler for %s: current state %s', name, self.in_test)
86 if self.in_test not in (name, name + '+removed'):
87 return
88 logger.info('Received signal from %s:%s, argument is %r',
89 sender, path, s)
90 result.append('received')
91 main_loop.quit()
92 def _rm_timeout_handler():
93 logger.debug('_timeout_handler for %s: current state %s', name, self.in_test)
94 if self.in_test == name + '+removed':
95 main_loop.quit()
97 logger.info('Testing %s', name)
98 match = iface.connect_to_signal('SignalOneString', _signal_handler,
99 sender_keyword='sender',
100 path_keyword='path')
101 logger.info('Waiting for signal...')
102 iface.EmitSignal('SignalOneString', 0)
103 source_id = gobject.timeout_add(1000, _timeout_handler)
104 main_loop.run()
105 if not result:
106 raise AssertionError('Signal did not arrive within 1 second')
107 logger.debug('Removing match')
108 match.remove()
109 gobject.source_remove(source_id)
110 if test_removal:
111 self.in_test = name + '+removed'
112 logger.info('Testing %s', name)
113 result = []
114 iface.EmitSignal('SignalOneString', 0)
115 source_id = gobject.timeout_add(1000, _rm_timeout_handler)
116 main_loop.run()
117 if result:
118 raise AssertionError('Signal should not have arrived, but did')
119 gobject.source_remove(source_id)
121 def testFallback(self):
122 self.signal_test_impl(self.fallback_iface, 'Fallback')
124 def testFallbackTrivial(self):
125 self.signal_test_impl(self.fallback_trivial_iface, 'FallbackTrivial')
127 def testSignal(self):
128 self.signal_test_impl(self.iface, 'Signal')
130 def testRemoval(self):
131 self.signal_test_impl(self.iface, 'Removal', True)
133 def testSignalAgain(self):
134 self.signal_test_impl(self.iface, 'SignalAgain')
136 def testRemovalAgain(self):
137 self.signal_test_impl(self.iface, 'RemovalAgain', True)
139 def testSignalF(self):
140 self.signal_test_impl(self.iface_follow, 'Signal')
142 def testRemovalF(self):
143 self.signal_test_impl(self.iface_follow, 'Removal', True)
145 def testSignalAgainF(self):
146 self.signal_test_impl(self.iface_follow, 'SignalAgain')
148 def testRemovalAgainF(self):
149 self.signal_test_impl(self.iface_follow, 'RemovalAgain', True)
151 if __name__ == '__main__':
152 main_loop = gobject.MainLoop()
153 gobject.threads_init()
154 dbus.glib.init_threads()
156 logger.info('Starting unit test')
157 unittest.main()