_dbus_bindings: debug-impl.h -> debug.c
[dbus-python-phuang.git] / test / cross-test-client.py
blob47dff9bda5c346b629aa0e0695b5d7e15a15099b
1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
3 # Licensed under the Academic Free License version 2.1
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program; if not, write to the Free Software
19 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 from sets import Set
22 from time import sleep
23 import logging
25 import gobject
27 from dbus import SessionBus, Interface, Array, Byte, Double, Boolean, ByteArray, Int16, Int32, Int64, UInt16, UInt32, UInt64, String, UTF8String, Struct, Dictionary
28 from dbus.service import BusName
29 import dbus.glib
31 from crosstest import CROSS_TEST_PATH, CROSS_TEST_BUS_NAME,\
32 INTERFACE_SINGLE_TESTS, INTERFACE_TESTS,\
33 INTERFACE_SIGNAL_TESTS, INTERFACE_CALLBACK_TESTS,\
34 SignalTestsImpl
37 logging.basicConfig()
38 logging.getLogger().setLevel(1)
39 logger = logging.getLogger('cross-test-client')
42 class Client(SignalTestsImpl):
43 fail_id = 0
44 expected = Set()
46 def quit(self):
47 for x in self.expected:
48 self.fail_id += 1
49 print "%s fail %d" % (x, self.fail_id)
50 s = "report %d: reply to %s didn't arrive" % (self.fail_id, x)
51 print s
52 logger.error(s)
53 logger.info("asking server to Exit")
54 Interface(self.obj, INTERFACE_TESTS).Exit(reply_handler=self.quit_reply_handler, error_handler=self.quit_error_handler)
55 # if the server doesn't reply we'll just exit anyway
56 gobject.timeout_add(1000, lambda: (loop.quit(), False)[1])
58 def quit_reply_handler(self):
59 logger.info("server says it will exit")
60 loop.quit()
62 def quit_error_handler(self, e):
63 logger.error("error telling server to quit: %s %s",
64 e.__class__, e)
65 loop.quit()
67 @dbus.service.method(INTERFACE_CALLBACK_TESTS, 'qd')
68 def Response(self, input1, input2):
69 logger.info("signal/callback: Response received (%r,%r)",
70 input1, input2)
71 self.expected.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
72 if (input1, input2) != (42, 23):
73 self.fail_id += 1
74 print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS, self.fail_id)
75 s = ("report %d: expected (42,23), got %r"
76 % (self.fail_id, (input1, input2)))
77 logger.error(s)
78 print s
79 else:
80 print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
81 self.quit()
83 def assert_method_matches(self, interface, check_fn, check_arg, member, *args):
84 if_obj = Interface(self.obj, interface)
85 method = getattr(if_obj, member)
86 try:
87 real_ret = method(*args)
88 except Exception, e:
89 self.fail_id += 1
90 print "%s.%s fail %d" % (interface, member, self.fail_id)
91 s = ("report %d: %s.%s%r: raised %r \"%s\""
92 % (self.fail_id, interface, member, args, e, e))
93 print s
94 logger.error(s)
95 __import__('traceback').print_exc()
96 return
97 try:
98 check_fn(real_ret, check_arg)
99 except Exception, e:
100 self.fail_id += 1
101 print "%s.%s fail %d" % (interface, member, self.fail_id)
102 s = ("report %d: %s.%s%r: %s"
103 % (self.fail_id, interface, member, args, e))
104 print s
105 logger.error(s)
106 return
107 print "%s.%s pass" % (interface, member)
109 def assert_method_eq(self, interface, ret, member, *args):
110 def equals(real_ret, exp):
111 if real_ret != exp:
112 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp, exp.__class__, real_ret, real_ret.__class__))
113 if real_ret != exp:
114 raise AssertionError('expected %r, got %r' % (exp, real_ret))
115 if not isinstance(exp, (tuple, type(None))):
116 if real_ret.variant_level != getattr(exp, 'variant_level', 0):
117 raise AssertionError('expected variant_level=%d, got %r with level %d'
118 % (getattr(exp, 'variant_level', 0), real_ret,
119 real_ret.variant_level))
120 if isinstance(exp, list) or isinstance(exp, tuple):
121 for i in xrange(len(exp)):
122 try:
123 equals(real_ret[i], exp[i])
124 except AssertionError, e:
125 if not isinstance(e.args, tuple):
126 e.args = (e.args,)
127 e.args = e.args + ('(at position %d in sequence)' % i,)
128 raise e
129 elif isinstance(exp, dict):
130 for k in exp:
131 try:
132 equals(real_ret[k], exp[k])
133 except AssertionError, e:
134 if not isinstance(e.args, tuple):
135 e.args = (e.args,)
136 e.args = e.args + ('(at key %r in dict)' % k,)
137 raise e
138 self.assert_method_matches(interface, equals, ret, member, *args)
140 def assert_InvertMapping_eq(self, interface, expected, member, mapping):
141 def check(real_ret, exp):
142 for key in exp:
143 if key not in real_ret:
144 raise AssertionError('missing key %r' % key)
145 for key in real_ret:
146 if key not in exp:
147 raise AssertionError('unexpected key %r' % key)
148 got = list(real_ret[key])
149 wanted = list(exp[key])
150 got.sort()
151 wanted.sort()
152 if got != wanted:
153 raise AssertionError('expected %r => %r, got %r'
154 % (key, wanted, got))
155 self.assert_method_matches(interface, check, expected, member, mapping)
157 def triggered_cb(self, param, sender_path):
158 logger.info("method/signal: Triggered(%r) by %r",
159 param, sender_path)
160 self.expected.discard('%s.Trigger' % INTERFACE_TESTS)
161 if sender_path != '/Where/Ever':
162 self.fail_id += 1
163 print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
164 s = ("report %d: expected signal from /Where/Ever, got %r"
165 % (self.fail_id, sender_path))
166 print s
167 logger.error(s)
168 elif param != 42:
169 self.fail_id += 1
170 print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
171 s = ("report %d: expected signal param 42, got %r"
172 % (self.fail_id, parameter))
173 print s
174 logger.error(s)
175 else:
176 print "%s.Trigger pass" % INTERFACE_TESTS
178 def trigger_returned_cb(self):
179 logger.info('method/signal: Trigger() returned')
180 # Callback tests
181 logger.info("signal/callback: Emitting signal to trigger callback")
182 self.expected.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
183 self.Trigger(UInt16(42), 23.0)
184 logger.info("signal/callback: Emitting signal returned")
186 def run_client(self):
187 bus = SessionBus()
188 obj = bus.get_object(CROSS_TEST_BUS_NAME, CROSS_TEST_PATH)
189 self.obj = obj
191 self.run_synchronous_tests(obj)
193 # Signal tests
194 logger.info("Binding signal handler for Triggered")
195 # FIXME: doesn't seem to work when going via the Interface method
196 # FIXME: should be possible to ask the proxy object for its
197 # bus name
198 bus.add_signal_receiver(self.triggered_cb, 'Triggered',
199 INTERFACE_SIGNAL_TESTS,
200 CROSS_TEST_BUS_NAME,
201 path_keyword='sender_path')
202 logger.info("method/signal: Triggering signal")
203 self.expected.add('%s.Trigger' % INTERFACE_TESTS)
204 Interface(obj, INTERFACE_TESTS).Trigger(u'/Where/Ever', dbus.UInt64(42), reply_handler=self.trigger_returned_cb, error_handler=self.trigger_error_handler)
206 def trigger_error_handler(self, e):
207 logger.error("method/signal: %s %s", e.__class__, e)
208 Interface(self.obj, INTERFACE_TESTS).Exit()
209 self.quit()
211 def run_synchronous_tests(self, obj):
212 # We can't test that coercion works correctly unless the server has
213 # sent us introspection data. Java doesn't :-/
214 have_signatures = True
216 # "Single tests"
217 if have_signatures:
218 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [1, 2, 3])
219 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ['\x01', '\x02', '\x03'])
220 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
221 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ByteArray('\x01\x02\x03'))
223 # Main tests
224 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', String('foo'))
225 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', UTF8String('foo'))
226 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
227 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=23), 'Identity', Byte(42, variant_level=23))
228 self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', 42.5)
229 self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
231 if have_signatures:
232 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', 'foo')
233 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
234 self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', Double(42.5))
235 self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
237 for i in (0, 42, 255):
238 self.assert_method_eq(INTERFACE_TESTS, Byte(i), 'IdentityByte', Byte(i))
239 for i in (True, False):
240 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityBool', i)
242 for i in (-0x8000, 0, 42, 0x7fff):
243 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt16', Int16(i))
244 for i in (0, 42, 0xffff):
245 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt16', UInt16(i))
246 for i in (-0x7fffffff-1, 0, 42, 0x7fffffff):
247 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt32', Int32(i))
248 for i in (0L, 42L, 0xffffffffL):
249 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt32', UInt32(i))
250 MANY = 0x8000L * 0x10000L * 0x10000L * 0x10000L
251 for i in (-MANY, 0, 42, MANY-1):
252 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt64', Int64(i))
253 for i in (0, 42, 2*MANY - 1):
254 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt64', UInt64(i))
256 self.assert_method_eq(INTERFACE_TESTS, 42.3, 'IdentityDouble', 42.3)
257 for i in ('', 'foo'):
258 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityString', i)
259 for i in (u'\xa9', '\xc2\xa9'):
260 self.assert_method_eq(INTERFACE_TESTS, u'\xa9', 'IdentityString', i)
262 if have_signatures:
263 self.assert_method_eq(INTERFACE_TESTS, Byte(0x42), 'IdentityByte', '\x42')
264 self.assert_method_eq(INTERFACE_TESTS, True, 'IdentityBool', 42)
265 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt16', 42)
266 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt16', 42)
267 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt32', 42)
268 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt32', 42)
269 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt64', 42)
270 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt64', 42)
271 self.assert_method_eq(INTERFACE_TESTS, 42.0, 'IdentityDouble', 42)
273 self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
274 Byte('\x02', variant_level=1),
275 Byte('\x03', variant_level=1)],
276 'IdentityArray',
277 Array([Byte('\x01'),
278 Byte('\x02'),
279 Byte('\x03')],
280 signature='v'))
282 self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
283 Int32(2, variant_level=1),
284 Int32(3, variant_level=1)],
285 'IdentityArray',
286 Array([Int32(1),
287 Int32(2),
288 Int32(3)],
289 signature='v'))
290 self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
291 String(u'b', variant_level=1),
292 String(u'c', variant_level=1)],
293 'IdentityArray',
294 Array([String('a'),
295 String('b'),
296 String('c')],
297 signature='v'))
299 if have_signatures:
300 self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
301 Byte('\x02', variant_level=1),
302 Byte('\x03', variant_level=1)],
303 'IdentityArray',
304 ByteArray('\x01\x02\x03'))
305 self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
306 Int32(2, variant_level=1),
307 Int32(3, variant_level=1)],
308 'IdentityArray',
309 [Int32(1),
310 Int32(2),
311 Int32(3)])
312 self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
313 String(u'b', variant_level=1),
314 String(u'c', variant_level=1)],
315 'IdentityArray',
316 ['a','b','c'])
318 self.assert_method_eq(INTERFACE_TESTS,
319 [Byte(1), Byte(2), Byte(3)],
320 'IdentityByteArray',
321 ByteArray('\x01\x02\x03'))
322 if have_signatures:
323 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
324 self.assert_method_eq(INTERFACE_TESTS, [False,True], 'IdentityBoolArray', [False,True])
325 if have_signatures:
326 self.assert_method_eq(INTERFACE_TESTS, [False,True,True], 'IdentityBoolArray', [0,1,2])
328 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
329 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
330 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
331 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
332 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
333 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
335 if have_signatures:
336 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [1,2,3])
337 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [1,2,3])
338 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [1,2,3])
339 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [1,2,3])
340 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [1,2,3])
341 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [1,2,3])
343 self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
344 if have_signatures:
345 self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
346 self.assert_method_eq(INTERFACE_TESTS, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
347 self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
348 if have_signatures:
349 self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [1,2,3])
351 self.assert_InvertMapping_eq(INTERFACE_TESTS, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
353 self.assert_method_eq(INTERFACE_TESTS, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
354 self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
355 'Primitize', [String('x', variant_level=1)])
356 self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
357 'Primitize', [String('x', variant_level=23)])
358 self.assert_method_eq(INTERFACE_TESTS,
359 Array([String('x', variant_level=1),
360 Byte(1, variant_level=1),
361 Byte(2, variant_level=1)]),
362 'Primitize',
363 Array([String('x'), Byte(1), Byte(2)],
364 signature='v'))
365 self.assert_method_eq(INTERFACE_TESTS,
366 Array([String('x', variant_level=1),
367 Byte(1, variant_level=1),
368 Byte(2, variant_level=1)]),
369 'Primitize',
370 Array([String('x'), Array([Byte(1), Byte(2)])],
371 signature='v'))
372 self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', True)
373 self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', False)
374 if have_signatures:
375 self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', 42)
376 self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', 0)
379 if __name__ == '__main__':
380 # FIXME: should be possible to export objects without a bus name
381 bus_name = BusName('com.example.Argh')
382 if 0:
383 client = Client(bus_name, '/Client')
384 else:
385 client = Client(bus_name, '/Test')
386 gobject.idle_add(client.run_client)
388 loop = gobject.MainLoop()
389 logger.info("running...")
390 loop.run()
391 logger.info("main loop exited.")