1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
3 # Licensed under the Academic Free License version 2.1
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program; if not, write to the Free Software
19 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 from time
import sleep
27 from dbus
import SessionBus
, Interface
, Array
, Byte
, Double
, Boolean
, ByteArray
, Int16
, Int32
, Int64
, UInt16
, UInt32
, UInt64
, String
, UTF8String
, Struct
, Dictionary
28 from dbus
.service
import BusName
31 from crosstest
import CROSS_TEST_PATH
, CROSS_TEST_BUS_NAME
,\
32 INTERFACE_SINGLE_TESTS
, INTERFACE_TESTS
,\
33 INTERFACE_SIGNAL_TESTS
, INTERFACE_CALLBACK_TESTS
,\
38 logging
.getLogger().setLevel(1)
39 logger
= logging
.getLogger('cross-test-client')
42 class Client(SignalTestsImpl
):
47 for x
in self
.expected
:
49 print "%s fail %d" % (x
, self
.fail_id
)
50 s
= "report %d: reply to %s didn't arrive" % (self
.fail_id
, x
)
53 logger
.info("asking server to Exit")
54 Interface(self
.obj
, INTERFACE_TESTS
).Exit(reply_handler
=self
.quit_reply_handler
, error_handler
=self
.quit_error_handler
)
55 # if the server doesn't reply we'll just exit anyway
56 gobject
.timeout_add(1000, lambda: (loop
.quit(), False)[1])
58 def quit_reply_handler(self
):
59 logger
.info("server says it will exit")
62 def quit_error_handler(self
, e
):
63 logger
.error("error telling server to quit: %s %s",
67 @dbus.service
.method(INTERFACE_CALLBACK_TESTS
, 'qd')
68 def Response(self
, input1
, input2
):
69 logger
.info("signal/callback: Response received (%r,%r)",
71 self
.expected
.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS
)
72 if (input1
, input2
) != (42, 23):
74 print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS
, self
.fail_id
)
75 s
= ("report %d: expected (42,23), got %r"
76 % (self
.fail_id
, (input1
, input2
)))
80 print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
83 def assert_method_matches(self
, interface
, check_fn
, check_arg
, member
, *args
):
84 if_obj
= Interface(self
.obj
, interface
)
85 method
= getattr(if_obj
, member
)
87 real_ret
= method(*args
)
90 print "%s.%s fail %d" % (interface
, member
, self
.fail_id
)
91 s
= ("report %d: %s.%s%r: raised %r \"%s\""
92 % (self
.fail_id
, interface
, member
, args
, e
, e
))
95 __import__('traceback').print_exc()
98 check_fn(real_ret
, check_arg
)
101 print "%s.%s fail %d" % (interface
, member
, self
.fail_id
)
102 s
= ("report %d: %s.%s%r: %s"
103 % (self
.fail_id
, interface
, member
, args
, e
))
107 print "%s.%s pass" % (interface
, member
)
109 def assert_method_eq(self
, interface
, ret
, member
, *args
):
110 def equals(real_ret
, exp
):
112 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp
, exp
.__class
__, real_ret
, real_ret
.__class
__))
114 raise AssertionError('expected %r, got %r' % (exp
, real_ret
))
115 if not isinstance(exp
, (tuple, type(None))):
116 if real_ret
.variant_level
!= getattr(exp
, 'variant_level', 0):
117 raise AssertionError('expected variant_level=%d, got %r with level %d'
118 % (getattr(exp
, 'variant_level', 0), real_ret
,
119 real_ret
.variant_level
))
120 if isinstance(exp
, list) or isinstance(exp
, tuple):
121 for i
in xrange(len(exp
)):
123 equals(real_ret
[i
], exp
[i
])
124 except AssertionError, e
:
125 if not isinstance(e
.args
, tuple):
127 e
.args
= e
.args
+ ('(at position %d in sequence)' % i
,)
129 elif isinstance(exp
, dict):
132 equals(real_ret
[k
], exp
[k
])
133 except AssertionError, e
:
134 if not isinstance(e
.args
, tuple):
136 e
.args
= e
.args
+ ('(at key %r in dict)' % k
,)
138 self
.assert_method_matches(interface
, equals
, ret
, member
, *args
)
140 def assert_InvertMapping_eq(self
, interface
, expected
, member
, mapping
):
141 def check(real_ret
, exp
):
143 if key
not in real_ret
:
144 raise AssertionError('missing key %r' % key
)
147 raise AssertionError('unexpected key %r' % key
)
148 got
= list(real_ret
[key
])
149 wanted
= list(exp
[key
])
153 raise AssertionError('expected %r => %r, got %r'
154 % (key
, wanted
, got
))
155 self
.assert_method_matches(interface
, check
, expected
, member
, mapping
)
157 def triggered_cb(self
, param
, sender_path
):
158 logger
.info("method/signal: Triggered(%r) by %r",
160 self
.expected
.discard('%s.Trigger' % INTERFACE_TESTS
)
161 if sender_path
!= '/Where/Ever':
163 print "%s.Trigger fail %d" % (INTERFACE_TESTS
, self
.fail_id
)
164 s
= ("report %d: expected signal from /Where/Ever, got %r"
165 % (self
.fail_id
, sender_path
))
170 print "%s.Trigger fail %d" % (INTERFACE_TESTS
, self
.fail_id
)
171 s
= ("report %d: expected signal param 42, got %r"
172 % (self
.fail_id
, parameter
))
176 print "%s.Trigger pass" % INTERFACE_TESTS
178 def trigger_returned_cb(self
):
179 logger
.info('method/signal: Trigger() returned')
181 logger
.info("signal/callback: Emitting signal to trigger callback")
182 self
.expected
.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS
)
183 self
.Trigger(UInt16(42), 23.0)
184 logger
.info("signal/callback: Emitting signal returned")
186 def run_client(self
):
188 obj
= bus
.get_object(CROSS_TEST_BUS_NAME
, CROSS_TEST_PATH
)
191 self
.run_synchronous_tests(obj
)
194 logger
.info("Binding signal handler for Triggered")
195 # FIXME: doesn't seem to work when going via the Interface method
196 # FIXME: should be possible to ask the proxy object for its
198 bus
.add_signal_receiver(self
.triggered_cb
, 'Triggered',
199 INTERFACE_SIGNAL_TESTS
,
201 path_keyword
='sender_path')
202 logger
.info("method/signal: Triggering signal")
203 self
.expected
.add('%s.Trigger' % INTERFACE_TESTS
)
204 Interface(obj
, INTERFACE_TESTS
).Trigger(u
'/Where/Ever', dbus
.UInt64(42), reply_handler
=self
.trigger_returned_cb
, error_handler
=self
.trigger_error_handler
)
206 def trigger_error_handler(self
, e
):
207 logger
.error("method/signal: %s %s", e
.__class
__, e
)
208 Interface(self
.obj
, INTERFACE_TESTS
).Exit()
211 def run_synchronous_tests(self
, obj
):
212 # We can't test that coercion works correctly unless the server has
213 # sent us introspection data. Java doesn't :-/
214 have_signatures
= True
218 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', [1, 2, 3])
219 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', ['\x01', '\x02', '\x03'])
220 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
221 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', ByteArray('\x01\x02\x03'))
224 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', String('foo'))
225 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', UTF8String('foo'))
226 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=1), 'Identity', Byte(42))
227 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=23), 'Identity', Byte(42, variant_level
=23))
228 self
.assert_method_eq(INTERFACE_TESTS
, Double(42.5, variant_level
=1), 'Identity', 42.5)
229 self
.assert_method_eq(INTERFACE_TESTS
, Double(-42.5, variant_level
=1), 'Identity', -42.5)
232 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', 'foo')
233 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=1), 'Identity', Byte(42))
234 self
.assert_method_eq(INTERFACE_TESTS
, Double(42.5, variant_level
=1), 'Identity', Double(42.5))
235 self
.assert_method_eq(INTERFACE_TESTS
, Double(-42.5, variant_level
=1), 'Identity', -42.5)
237 for i
in (0, 42, 255):
238 self
.assert_method_eq(INTERFACE_TESTS
, Byte(i
), 'IdentityByte', Byte(i
))
239 for i
in (True, False):
240 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityBool', i
)
242 for i
in (-0x8000, 0, 42, 0x7fff):
243 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt16', Int16(i
))
244 for i
in (0, 42, 0xffff):
245 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt16', UInt16(i
))
246 for i
in (-0x7fffffff-1, 0, 42, 0x7fffffff):
247 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt32', Int32(i
))
248 for i
in (0L, 42L, 0xffffffffL
):
249 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt32', UInt32(i
))
250 MANY
= 0x8000L
* 0x10000L
* 0x10000L
* 0x10000L
251 for i
in (-MANY
, 0, 42, MANY
-1):
252 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt64', Int64(i
))
253 for i
in (0, 42, 2*MANY
- 1):
254 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt64', UInt64(i
))
256 self
.assert_method_eq(INTERFACE_TESTS
, 42.3, 'IdentityDouble', 42.3)
257 for i
in ('', 'foo'):
258 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityString', i
)
259 for i
in (u
'\xa9', '\xc2\xa9'):
260 self
.assert_method_eq(INTERFACE_TESTS
, u
'\xa9', 'IdentityString', i
)
263 self
.assert_method_eq(INTERFACE_TESTS
, Byte(0x42), 'IdentityByte', '\x42')
264 self
.assert_method_eq(INTERFACE_TESTS
, True, 'IdentityBool', 42)
265 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt16', 42)
266 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt16', 42)
267 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt32', 42)
268 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt32', 42)
269 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt64', 42)
270 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt64', 42)
271 self
.assert_method_eq(INTERFACE_TESTS
, 42.0, 'IdentityDouble', 42)
273 self
.assert_method_eq(INTERFACE_TESTS
, [Byte('\x01', variant_level
=1),
274 Byte('\x02', variant_level
=1),
275 Byte('\x03', variant_level
=1)],
282 self
.assert_method_eq(INTERFACE_TESTS
, [Int32(1, variant_level
=1),
283 Int32(2, variant_level
=1),
284 Int32(3, variant_level
=1)],
290 self
.assert_method_eq(INTERFACE_TESTS
, [String(u
'a', variant_level
=1),
291 String(u
'b', variant_level
=1),
292 String(u
'c', variant_level
=1)],
300 self
.assert_method_eq(INTERFACE_TESTS
, [Byte('\x01', variant_level
=1),
301 Byte('\x02', variant_level
=1),
302 Byte('\x03', variant_level
=1)],
304 ByteArray('\x01\x02\x03'))
305 self
.assert_method_eq(INTERFACE_TESTS
, [Int32(1, variant_level
=1),
306 Int32(2, variant_level
=1),
307 Int32(3, variant_level
=1)],
312 self
.assert_method_eq(INTERFACE_TESTS
, [String(u
'a', variant_level
=1),
313 String(u
'b', variant_level
=1),
314 String(u
'c', variant_level
=1)],
318 self
.assert_method_eq(INTERFACE_TESTS
,
319 [Byte(1), Byte(2), Byte(3)],
321 ByteArray('\x01\x02\x03'))
323 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
324 self
.assert_method_eq(INTERFACE_TESTS
, [False,True], 'IdentityBoolArray', [False,True])
326 self
.assert_method_eq(INTERFACE_TESTS
, [False,True,True], 'IdentityBoolArray', [0,1,2])
328 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
329 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
330 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
331 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
332 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
333 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
336 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt16Array', [1,2,3])
337 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt16Array', [1,2,3])
338 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt32Array', [1,2,3])
339 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt32Array', [1,2,3])
340 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt64Array', [1,2,3])
341 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt64Array', [1,2,3])
343 self
.assert_method_eq(INTERFACE_TESTS
, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
345 self
.assert_method_eq(INTERFACE_TESTS
, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
346 self
.assert_method_eq(INTERFACE_TESTS
, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
347 self
.assert_method_eq(INTERFACE_TESTS
, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
349 self
.assert_method_eq(INTERFACE_TESTS
, 6, 'Sum', [1,2,3])
351 self
.assert_InvertMapping_eq(INTERFACE_TESTS
, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
353 self
.assert_method_eq(INTERFACE_TESTS
, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
354 self
.assert_method_eq(INTERFACE_TESTS
, Array([String('x', variant_level
=1)]),
355 'Primitize', [String('x', variant_level
=1)])
356 self
.assert_method_eq(INTERFACE_TESTS
, Array([String('x', variant_level
=1)]),
357 'Primitize', [String('x', variant_level
=23)])
358 self
.assert_method_eq(INTERFACE_TESTS
,
359 Array([String('x', variant_level
=1),
360 Byte(1, variant_level
=1),
361 Byte(2, variant_level
=1)]),
363 Array([String('x'), Byte(1), Byte(2)],
365 self
.assert_method_eq(INTERFACE_TESTS
,
366 Array([String('x', variant_level
=1),
367 Byte(1, variant_level
=1),
368 Byte(2, variant_level
=1)]),
370 Array([String('x'), Array([Byte(1), Byte(2)])],
372 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(False), 'Invert', True)
373 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(True), 'Invert', False)
375 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(False), 'Invert', 42)
376 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(True), 'Invert', 0)
379 if __name__
== '__main__':
380 # FIXME: should be possible to export objects without a bus name
381 bus_name
= BusName('com.example.Argh')
383 client
= Client(bus_name
, '/Client')
385 client
= Client(bus_name
, '/Test')
386 gobject
.idle_add(client
.run_client
)
388 loop
= gobject
.MainLoop()
389 logger
.info("running...")
391 logger
.info("main loop exited.")