* dbus.service.Object: don't let the user try to export objects on the local
[dbus-python-phuang.git] / test / cross-test-client.py
blob781b401325458c69bc86a096c79ea37ab250148f
1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
3 # Licensed under the Academic Free License version 2.1
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU Library General Public License as published by
7 # the Free Software Foundation; either version 2.1 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Library General Public License for more details.
15 # You should have received a copy of the GNU Library General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 from sets import Set
20 from time import sleep
21 import logging
23 import gobject
25 from dbus import SessionBus, Interface, Array, Byte, Double, Boolean, ByteArray, Int16, Int32, Int64, UInt16, UInt32, UInt64, String, UTF8String, Struct, Dictionary
26 from dbus.service import BusName
27 import dbus.glib
29 from crosstest import CROSS_TEST_PATH, CROSS_TEST_BUS_NAME,\
30 INTERFACE_SINGLE_TESTS, INTERFACE_TESTS,\
31 INTERFACE_SIGNAL_TESTS, INTERFACE_CALLBACK_TESTS,\
32 SignalTestsImpl
35 logging.basicConfig()
36 logging.getLogger().setLevel(1)
37 logger = logging.getLogger('cross-test-client')
40 class Client(SignalTestsImpl):
41 fail_id = 0
42 expected = Set()
44 def quit(self):
45 for x in self.expected:
46 self.fail_id += 1
47 print "%s fail %d" % (x, self.fail_id)
48 s = "report %d: reply to %s didn't arrive" % (self.fail_id, x)
49 print s
50 logger.error(s)
51 logger.info("asking server to Exit")
52 Interface(self.obj, INTERFACE_TESTS).Exit(reply_handler=self.quit_reply_handler, error_handler=self.quit_error_handler)
53 # if the server doesn't reply we'll just exit anyway
54 gobject.timeout_add(1000, lambda: (loop.quit(), False)[1])
56 def quit_reply_handler(self):
57 logger.info("server says it will exit")
58 loop.quit()
60 def quit_error_handler(self, e):
61 logger.error("error telling server to quit: %s %s",
62 e.__class__, e)
63 loop.quit()
65 @dbus.service.method(INTERFACE_CALLBACK_TESTS, 'qd')
66 def Response(self, input1, input2):
67 logger.info("signal/callback: Response received (%r,%r)",
68 input1, input2)
69 self.expected.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
70 if (input1, input2) != (42, 23):
71 self.fail_id += 1
72 print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS, self.fail_id)
73 s = ("report %d: expected (42,23), got %r"
74 % (self.fail_id, (input1, input2)))
75 logger.error(s)
76 print s
77 else:
78 print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
79 self.quit()
81 def assert_method_matches(self, interface, check_fn, check_arg, member, *args):
82 if_obj = Interface(self.obj, interface)
83 method = getattr(if_obj, member)
84 try:
85 real_ret = method(*args)
86 except Exception, e:
87 self.fail_id += 1
88 print "%s.%s fail %d" % (interface, member, self.fail_id)
89 s = ("report %d: %s.%s%r: raised %r \"%s\""
90 % (self.fail_id, interface, member, args, e, e))
91 print s
92 logger.error(s)
93 __import__('traceback').print_exc()
94 return
95 try:
96 check_fn(real_ret, check_arg)
97 except Exception, e:
98 self.fail_id += 1
99 print "%s.%s fail %d" % (interface, member, self.fail_id)
100 s = ("report %d: %s.%s%r: %s"
101 % (self.fail_id, interface, member, args, e))
102 print s
103 logger.error(s)
104 return
105 print "%s.%s pass" % (interface, member)
107 def assert_method_eq(self, interface, ret, member, *args):
108 def equals(real_ret, exp):
109 if real_ret != exp:
110 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp, exp.__class__, real_ret, real_ret.__class__))
111 if real_ret != exp:
112 raise AssertionError('expected %r, got %r' % (exp, real_ret))
113 if not isinstance(exp, (tuple, type(None))):
114 if real_ret.variant_level != getattr(exp, 'variant_level', 0):
115 raise AssertionError('expected variant_level=%d, got %r with level %d'
116 % (getattr(exp, 'variant_level', 0), real_ret,
117 real_ret.variant_level))
118 if isinstance(exp, list) or isinstance(exp, tuple):
119 for i in xrange(len(exp)):
120 try:
121 equals(real_ret[i], exp[i])
122 except AssertionError, e:
123 if not isinstance(e.args, tuple):
124 e.args = (e.args,)
125 e.args = e.args + ('(at position %d in sequence)' % i,)
126 raise e
127 elif isinstance(exp, dict):
128 for k in exp:
129 try:
130 equals(real_ret[k], exp[k])
131 except AssertionError, e:
132 if not isinstance(e.args, tuple):
133 e.args = (e.args,)
134 e.args = e.args + ('(at key %r in dict)' % k,)
135 raise e
136 self.assert_method_matches(interface, equals, ret, member, *args)
138 def assert_InvertMapping_eq(self, interface, expected, member, mapping):
139 def check(real_ret, exp):
140 for key in exp:
141 if key not in real_ret:
142 raise AssertionError('missing key %r' % key)
143 for key in real_ret:
144 if key not in exp:
145 raise AssertionError('unexpected key %r' % key)
146 got = list(real_ret[key])
147 wanted = list(exp[key])
148 got.sort()
149 wanted.sort()
150 if got != wanted:
151 raise AssertionError('expected %r => %r, got %r'
152 % (key, wanted, got))
153 self.assert_method_matches(interface, check, expected, member, mapping)
155 def triggered_cb(self, param, sender_path):
156 logger.info("method/signal: Triggered(%r) by %r",
157 param, sender_path)
158 self.expected.discard('%s.Trigger' % INTERFACE_TESTS)
159 if sender_path != '/Where/Ever':
160 self.fail_id += 1
161 print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
162 s = ("report %d: expected signal from /Where/Ever, got %r"
163 % (self.fail_id, sender_path))
164 print s
165 logger.error(s)
166 elif param != 42:
167 self.fail_id += 1
168 print "%s.Trigger fail %d" % (INTERFACE_TESTS, self.fail_id)
169 s = ("report %d: expected signal param 42, got %r"
170 % (self.fail_id, parameter))
171 print s
172 logger.error(s)
173 else:
174 print "%s.Trigger pass" % INTERFACE_TESTS
176 def trigger_returned_cb(self):
177 logger.info('method/signal: Trigger() returned')
178 # Callback tests
179 logger.info("signal/callback: Emitting signal to trigger callback")
180 self.expected.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS)
181 self.Trigger(UInt16(42), 23.0)
182 logger.info("signal/callback: Emitting signal returned")
184 def run_client(self):
185 bus = SessionBus()
186 obj = bus.get_object(CROSS_TEST_BUS_NAME, CROSS_TEST_PATH)
187 self.obj = obj
189 self.run_synchronous_tests(obj)
191 # Signal tests
192 logger.info("Binding signal handler for Triggered")
193 # FIXME: doesn't seem to work when going via the Interface method
194 # FIXME: should be possible to ask the proxy object for its
195 # bus name
196 bus.add_signal_receiver(self.triggered_cb, 'Triggered',
197 INTERFACE_SIGNAL_TESTS,
198 CROSS_TEST_BUS_NAME,
199 path_keyword='sender_path')
200 logger.info("method/signal: Triggering signal")
201 self.expected.add('%s.Trigger' % INTERFACE_TESTS)
202 Interface(obj, INTERFACE_TESTS).Trigger(u'/Where/Ever', dbus.UInt64(42), reply_handler=self.trigger_returned_cb, error_handler=self.trigger_error_handler)
204 def trigger_error_handler(self, e):
205 logger.error("method/signal: %s %s", e.__class__, e)
206 Interface(self.obj, INTERFACE_TESTS).Exit()
207 self.quit()
209 def run_synchronous_tests(self, obj):
210 # We can't test that coercion works correctly unless the server has
211 # sent us introspection data. Java doesn't :-/
212 have_signatures = True
214 # "Single tests"
215 if have_signatures:
216 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [1, 2, 3])
217 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ['\x01', '\x02', '\x03'])
218 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
219 self.assert_method_eq(INTERFACE_SINGLE_TESTS, 6, 'Sum', ByteArray('\x01\x02\x03'))
221 # Main tests
222 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', String('foo'))
223 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', UTF8String('foo'))
224 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
225 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=23), 'Identity', Byte(42, variant_level=23))
226 self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', 42.5)
227 self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
229 if have_signatures:
230 self.assert_method_eq(INTERFACE_TESTS, String(u'foo', variant_level=1), 'Identity', 'foo')
231 self.assert_method_eq(INTERFACE_TESTS, Byte(42, variant_level=1), 'Identity', Byte(42))
232 self.assert_method_eq(INTERFACE_TESTS, Double(42.5, variant_level=1), 'Identity', Double(42.5))
233 self.assert_method_eq(INTERFACE_TESTS, Double(-42.5, variant_level=1), 'Identity', -42.5)
235 for i in (0, 42, 255):
236 self.assert_method_eq(INTERFACE_TESTS, Byte(i), 'IdentityByte', Byte(i))
237 for i in (True, False):
238 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityBool', i)
240 for i in (-0x8000, 0, 42, 0x7fff):
241 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt16', Int16(i))
242 for i in (0, 42, 0xffff):
243 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt16', UInt16(i))
244 for i in (-0x7fffffff-1, 0, 42, 0x7fffffff):
245 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt32', Int32(i))
246 for i in (0L, 42L, 0xffffffffL):
247 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt32', UInt32(i))
248 MANY = 0x8000L * 0x10000L * 0x10000L * 0x10000L
249 for i in (-MANY, 0, 42, MANY-1):
250 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityInt64', Int64(i))
251 for i in (0, 42, 2*MANY - 1):
252 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityUInt64', UInt64(i))
254 self.assert_method_eq(INTERFACE_TESTS, 42.3, 'IdentityDouble', 42.3)
255 for i in ('', 'foo'):
256 self.assert_method_eq(INTERFACE_TESTS, i, 'IdentityString', i)
257 for i in (u'\xa9', '\xc2\xa9'):
258 self.assert_method_eq(INTERFACE_TESTS, u'\xa9', 'IdentityString', i)
260 if have_signatures:
261 self.assert_method_eq(INTERFACE_TESTS, Byte(0x42), 'IdentityByte', '\x42')
262 self.assert_method_eq(INTERFACE_TESTS, True, 'IdentityBool', 42)
263 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt16', 42)
264 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt16', 42)
265 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt32', 42)
266 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt32', 42)
267 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityInt64', 42)
268 self.assert_method_eq(INTERFACE_TESTS, 42, 'IdentityUInt64', 42)
269 self.assert_method_eq(INTERFACE_TESTS, 42.0, 'IdentityDouble', 42)
271 self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
272 Byte('\x02', variant_level=1),
273 Byte('\x03', variant_level=1)],
274 'IdentityArray',
275 Array([Byte('\x01'),
276 Byte('\x02'),
277 Byte('\x03')],
278 signature='v'))
280 self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
281 Int32(2, variant_level=1),
282 Int32(3, variant_level=1)],
283 'IdentityArray',
284 Array([Int32(1),
285 Int32(2),
286 Int32(3)],
287 signature='v'))
288 self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
289 String(u'b', variant_level=1),
290 String(u'c', variant_level=1)],
291 'IdentityArray',
292 Array([String('a'),
293 String('b'),
294 String('c')],
295 signature='v'))
297 if have_signatures:
298 self.assert_method_eq(INTERFACE_TESTS, [Byte('\x01', variant_level=1),
299 Byte('\x02', variant_level=1),
300 Byte('\x03', variant_level=1)],
301 'IdentityArray',
302 ByteArray('\x01\x02\x03'))
303 self.assert_method_eq(INTERFACE_TESTS, [Int32(1, variant_level=1),
304 Int32(2, variant_level=1),
305 Int32(3, variant_level=1)],
306 'IdentityArray',
307 [Int32(1),
308 Int32(2),
309 Int32(3)])
310 self.assert_method_eq(INTERFACE_TESTS, [String(u'a', variant_level=1),
311 String(u'b', variant_level=1),
312 String(u'c', variant_level=1)],
313 'IdentityArray',
314 ['a','b','c'])
316 self.assert_method_eq(INTERFACE_TESTS,
317 [Byte(1), Byte(2), Byte(3)],
318 'IdentityByteArray',
319 ByteArray('\x01\x02\x03'))
320 if have_signatures:
321 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
322 self.assert_method_eq(INTERFACE_TESTS, [False,True], 'IdentityBoolArray', [False,True])
323 if have_signatures:
324 self.assert_method_eq(INTERFACE_TESTS, [False,True,True], 'IdentityBoolArray', [0,1,2])
326 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
327 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
328 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
329 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
330 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
331 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
333 if have_signatures:
334 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt16Array', [1,2,3])
335 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt16Array', [1,2,3])
336 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt32Array', [1,2,3])
337 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt32Array', [1,2,3])
338 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityInt64Array', [1,2,3])
339 self.assert_method_eq(INTERFACE_TESTS, [1,2,3], 'IdentityUInt64Array', [1,2,3])
341 self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
342 if have_signatures:
343 self.assert_method_eq(INTERFACE_TESTS, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
344 self.assert_method_eq(INTERFACE_TESTS, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
345 self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
346 if have_signatures:
347 self.assert_method_eq(INTERFACE_TESTS, 6, 'Sum', [1,2,3])
349 self.assert_InvertMapping_eq(INTERFACE_TESTS, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
351 self.assert_method_eq(INTERFACE_TESTS, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
352 self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
353 'Primitize', [String('x', variant_level=1)])
354 self.assert_method_eq(INTERFACE_TESTS, Array([String('x', variant_level=1)]),
355 'Primitize', [String('x', variant_level=23)])
356 self.assert_method_eq(INTERFACE_TESTS,
357 Array([String('x', variant_level=1),
358 Byte(1, variant_level=1),
359 Byte(2, variant_level=1)]),
360 'Primitize',
361 Array([String('x'), Byte(1), Byte(2)],
362 signature='v'))
363 self.assert_method_eq(INTERFACE_TESTS,
364 Array([String('x', variant_level=1),
365 Byte(1, variant_level=1),
366 Byte(2, variant_level=1)]),
367 'Primitize',
368 Array([String('x'), Array([Byte(1), Byte(2)])],
369 signature='v'))
370 self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', True)
371 self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', False)
372 if have_signatures:
373 self.assert_method_eq(INTERFACE_TESTS, Boolean(False), 'Invert', 42)
374 self.assert_method_eq(INTERFACE_TESTS, Boolean(True), 'Invert', 0)
377 if __name__ == '__main__':
378 # FIXME: should be possible to export objects without a bus name
379 bus_name = BusName('com.example.Argh')
380 if 0:
381 client = Client(bus_name, '/Client')
382 else:
383 # the Java cross test's interpretation is that the client should be
384 # at /Test too
385 client = Client(bus_name, '/Test')
386 gobject.idle_add(client.run_client)
388 loop = gobject.MainLoop()
389 logger.info("running...")
390 loop.run()
391 logger.info("main loop exited.")