1 # Copyright (C) 2006 Collabora Ltd. <http://www.collabora.co.uk/>
3 # Licensed under the Academic Free License version 2.1
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU Library General Public License as published by
7 # the Free Software Foundation; either version 2.1 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Library General Public License for more details.
15 # You should have received a copy of the GNU Library General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 from time
import sleep
25 from dbus
import SessionBus
, Interface
, Array
, Byte
, Double
, Boolean
, ByteArray
, Int16
, Int32
, Int64
, UInt16
, UInt32
, UInt64
, String
, UTF8String
, Struct
, Dictionary
26 from dbus
.service
import BusName
29 from crosstest
import CROSS_TEST_PATH
, CROSS_TEST_BUS_NAME
,\
30 INTERFACE_SINGLE_TESTS
, INTERFACE_TESTS
,\
31 INTERFACE_SIGNAL_TESTS
, INTERFACE_CALLBACK_TESTS
,\
36 logging
.getLogger().setLevel(1)
37 logger
= logging
.getLogger('cross-test-client')
40 class Client(SignalTestsImpl
):
45 for x
in self
.expected
:
47 print "%s fail %d" % (x
, self
.fail_id
)
48 s
= "report %d: reply to %s didn't arrive" % (self
.fail_id
, x
)
51 logger
.info("asking server to Exit")
52 Interface(self
.obj
, INTERFACE_TESTS
).Exit(reply_handler
=self
.quit_reply_handler
, error_handler
=self
.quit_error_handler
)
53 # if the server doesn't reply we'll just exit anyway
54 gobject
.timeout_add(1000, lambda: (loop
.quit(), False)[1])
56 def quit_reply_handler(self
):
57 logger
.info("server says it will exit")
60 def quit_error_handler(self
, e
):
61 logger
.error("error telling server to quit: %s %s",
65 @dbus.service
.method(INTERFACE_CALLBACK_TESTS
, 'qd')
66 def Response(self
, input1
, input2
):
67 logger
.info("signal/callback: Response received (%r,%r)",
69 self
.expected
.discard('%s.Trigger' % INTERFACE_SIGNAL_TESTS
)
70 if (input1
, input2
) != (42, 23):
72 print "%s.Trigger fail %d" % (INTERFACE_SIGNAL_TESTS
, self
.fail_id
)
73 s
= ("report %d: expected (42,23), got %r"
74 % (self
.fail_id
, (input1
, input2
)))
78 print "%s.Trigger pass" % INTERFACE_SIGNAL_TESTS
81 def assert_method_matches(self
, interface
, check_fn
, check_arg
, member
, *args
):
82 if_obj
= Interface(self
.obj
, interface
)
83 method
= getattr(if_obj
, member
)
85 real_ret
= method(*args
)
88 print "%s.%s fail %d" % (interface
, member
, self
.fail_id
)
89 s
= ("report %d: %s.%s%r: raised %r \"%s\""
90 % (self
.fail_id
, interface
, member
, args
, e
, e
))
93 __import__('traceback').print_exc()
96 check_fn(real_ret
, check_arg
)
99 print "%s.%s fail %d" % (interface
, member
, self
.fail_id
)
100 s
= ("report %d: %s.%s%r: %s"
101 % (self
.fail_id
, interface
, member
, args
, e
))
105 print "%s.%s pass" % (interface
, member
)
107 def assert_method_eq(self
, interface
, ret
, member
, *args
):
108 def equals(real_ret
, exp
):
110 raise AssertionError('expected %r of class %s, got %r of class %s' % (exp
, exp
.__class
__, real_ret
, real_ret
.__class
__))
112 raise AssertionError('expected %r, got %r' % (exp
, real_ret
))
113 if not isinstance(exp
, (tuple, type(None))):
114 if real_ret
.variant_level
!= getattr(exp
, 'variant_level', 0):
115 raise AssertionError('expected variant_level=%d, got %r with level %d'
116 % (getattr(exp
, 'variant_level', 0), real_ret
,
117 real_ret
.variant_level
))
118 if isinstance(exp
, list) or isinstance(exp
, tuple):
119 for i
in xrange(len(exp
)):
121 equals(real_ret
[i
], exp
[i
])
122 except AssertionError, e
:
123 if not isinstance(e
.args
, tuple):
125 e
.args
= e
.args
+ ('(at position %d in sequence)' % i
,)
127 elif isinstance(exp
, dict):
130 equals(real_ret
[k
], exp
[k
])
131 except AssertionError, e
:
132 if not isinstance(e
.args
, tuple):
134 e
.args
= e
.args
+ ('(at key %r in dict)' % k
,)
136 self
.assert_method_matches(interface
, equals
, ret
, member
, *args
)
138 def assert_InvertMapping_eq(self
, interface
, expected
, member
, mapping
):
139 def check(real_ret
, exp
):
141 if key
not in real_ret
:
142 raise AssertionError('missing key %r' % key
)
145 raise AssertionError('unexpected key %r' % key
)
146 got
= list(real_ret
[key
])
147 wanted
= list(exp
[key
])
151 raise AssertionError('expected %r => %r, got %r'
152 % (key
, wanted
, got
))
153 self
.assert_method_matches(interface
, check
, expected
, member
, mapping
)
155 def triggered_cb(self
, param
, sender_path
):
156 logger
.info("method/signal: Triggered(%r) by %r",
158 self
.expected
.discard('%s.Trigger' % INTERFACE_TESTS
)
159 if sender_path
!= '/Where/Ever':
161 print "%s.Trigger fail %d" % (INTERFACE_TESTS
, self
.fail_id
)
162 s
= ("report %d: expected signal from /Where/Ever, got %r"
163 % (self
.fail_id
, sender_path
))
168 print "%s.Trigger fail %d" % (INTERFACE_TESTS
, self
.fail_id
)
169 s
= ("report %d: expected signal param 42, got %r"
170 % (self
.fail_id
, parameter
))
174 print "%s.Trigger pass" % INTERFACE_TESTS
176 def trigger_returned_cb(self
):
177 logger
.info('method/signal: Trigger() returned')
179 logger
.info("signal/callback: Emitting signal to trigger callback")
180 self
.expected
.add('%s.Trigger' % INTERFACE_SIGNAL_TESTS
)
181 self
.Trigger(UInt16(42), 23.0)
182 logger
.info("signal/callback: Emitting signal returned")
184 def run_client(self
):
186 obj
= bus
.get_object(CROSS_TEST_BUS_NAME
, CROSS_TEST_PATH
)
189 self
.run_synchronous_tests(obj
)
192 logger
.info("Binding signal handler for Triggered")
193 # FIXME: doesn't seem to work when going via the Interface method
194 # FIXME: should be possible to ask the proxy object for its
196 bus
.add_signal_receiver(self
.triggered_cb
, 'Triggered',
197 INTERFACE_SIGNAL_TESTS
,
199 path_keyword
='sender_path')
200 logger
.info("method/signal: Triggering signal")
201 self
.expected
.add('%s.Trigger' % INTERFACE_TESTS
)
202 Interface(obj
, INTERFACE_TESTS
).Trigger(u
'/Where/Ever', dbus
.UInt64(42), reply_handler
=self
.trigger_returned_cb
, error_handler
=self
.trigger_error_handler
)
204 def trigger_error_handler(self
, e
):
205 logger
.error("method/signal: %s %s", e
.__class
__, e
)
206 Interface(self
.obj
, INTERFACE_TESTS
).Exit()
209 def run_synchronous_tests(self
, obj
):
210 # We can't test that coercion works correctly unless the server has
211 # sent us introspection data. Java doesn't :-/
212 have_signatures
= True
216 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', [1, 2, 3])
217 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', ['\x01', '\x02', '\x03'])
218 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', [Byte(1), Byte(2), Byte(3)])
219 self
.assert_method_eq(INTERFACE_SINGLE_TESTS
, 6, 'Sum', ByteArray('\x01\x02\x03'))
222 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', String('foo'))
223 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', UTF8String('foo'))
224 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=1), 'Identity', Byte(42))
225 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=23), 'Identity', Byte(42, variant_level
=23))
226 self
.assert_method_eq(INTERFACE_TESTS
, Double(42.5, variant_level
=1), 'Identity', 42.5)
227 self
.assert_method_eq(INTERFACE_TESTS
, Double(-42.5, variant_level
=1), 'Identity', -42.5)
230 self
.assert_method_eq(INTERFACE_TESTS
, String(u
'foo', variant_level
=1), 'Identity', 'foo')
231 self
.assert_method_eq(INTERFACE_TESTS
, Byte(42, variant_level
=1), 'Identity', Byte(42))
232 self
.assert_method_eq(INTERFACE_TESTS
, Double(42.5, variant_level
=1), 'Identity', Double(42.5))
233 self
.assert_method_eq(INTERFACE_TESTS
, Double(-42.5, variant_level
=1), 'Identity', -42.5)
235 for i
in (0, 42, 255):
236 self
.assert_method_eq(INTERFACE_TESTS
, Byte(i
), 'IdentityByte', Byte(i
))
237 for i
in (True, False):
238 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityBool', i
)
240 for i
in (-0x8000, 0, 42, 0x7fff):
241 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt16', Int16(i
))
242 for i
in (0, 42, 0xffff):
243 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt16', UInt16(i
))
244 for i
in (-0x7fffffff-1, 0, 42, 0x7fffffff):
245 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt32', Int32(i
))
246 for i
in (0L, 42L, 0xffffffffL
):
247 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt32', UInt32(i
))
248 MANY
= 0x8000L
* 0x10000L
* 0x10000L
* 0x10000L
249 for i
in (-MANY
, 0, 42, MANY
-1):
250 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityInt64', Int64(i
))
251 for i
in (0, 42, 2*MANY
- 1):
252 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityUInt64', UInt64(i
))
254 self
.assert_method_eq(INTERFACE_TESTS
, 42.3, 'IdentityDouble', 42.3)
255 for i
in ('', 'foo'):
256 self
.assert_method_eq(INTERFACE_TESTS
, i
, 'IdentityString', i
)
257 for i
in (u
'\xa9', '\xc2\xa9'):
258 self
.assert_method_eq(INTERFACE_TESTS
, u
'\xa9', 'IdentityString', i
)
261 self
.assert_method_eq(INTERFACE_TESTS
, Byte(0x42), 'IdentityByte', '\x42')
262 self
.assert_method_eq(INTERFACE_TESTS
, True, 'IdentityBool', 42)
263 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt16', 42)
264 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt16', 42)
265 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt32', 42)
266 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt32', 42)
267 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityInt64', 42)
268 self
.assert_method_eq(INTERFACE_TESTS
, 42, 'IdentityUInt64', 42)
269 self
.assert_method_eq(INTERFACE_TESTS
, 42.0, 'IdentityDouble', 42)
271 self
.assert_method_eq(INTERFACE_TESTS
, [Byte('\x01', variant_level
=1),
272 Byte('\x02', variant_level
=1),
273 Byte('\x03', variant_level
=1)],
280 self
.assert_method_eq(INTERFACE_TESTS
, [Int32(1, variant_level
=1),
281 Int32(2, variant_level
=1),
282 Int32(3, variant_level
=1)],
288 self
.assert_method_eq(INTERFACE_TESTS
, [String(u
'a', variant_level
=1),
289 String(u
'b', variant_level
=1),
290 String(u
'c', variant_level
=1)],
298 self
.assert_method_eq(INTERFACE_TESTS
, [Byte('\x01', variant_level
=1),
299 Byte('\x02', variant_level
=1),
300 Byte('\x03', variant_level
=1)],
302 ByteArray('\x01\x02\x03'))
303 self
.assert_method_eq(INTERFACE_TESTS
, [Int32(1, variant_level
=1),
304 Int32(2, variant_level
=1),
305 Int32(3, variant_level
=1)],
310 self
.assert_method_eq(INTERFACE_TESTS
, [String(u
'a', variant_level
=1),
311 String(u
'b', variant_level
=1),
312 String(u
'c', variant_level
=1)],
316 self
.assert_method_eq(INTERFACE_TESTS
,
317 [Byte(1), Byte(2), Byte(3)],
319 ByteArray('\x01\x02\x03'))
321 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityByteArray', ['\x01', '\x02', '\x03'])
322 self
.assert_method_eq(INTERFACE_TESTS
, [False,True], 'IdentityBoolArray', [False,True])
324 self
.assert_method_eq(INTERFACE_TESTS
, [False,True,True], 'IdentityBoolArray', [0,1,2])
326 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt16Array', [Int16(1),Int16(2),Int16(3)])
327 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt16Array', [UInt16(1),UInt16(2),UInt16(3)])
328 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt32Array', [Int32(1),Int32(2),Int32(3)])
329 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt32Array', [UInt32(1),UInt32(2),UInt32(3)])
330 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt64Array', [Int64(1),Int64(2),Int64(3)])
331 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt64Array', [UInt64(1),UInt64(2),UInt64(3)])
334 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt16Array', [1,2,3])
335 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt16Array', [1,2,3])
336 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt32Array', [1,2,3])
337 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt32Array', [1,2,3])
338 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityInt64Array', [1,2,3])
339 self
.assert_method_eq(INTERFACE_TESTS
, [1,2,3], 'IdentityUInt64Array', [1,2,3])
341 self
.assert_method_eq(INTERFACE_TESTS
, [1.0,2.5,3.1], 'IdentityDoubleArray', [1.0,2.5,3.1])
343 self
.assert_method_eq(INTERFACE_TESTS
, [1.0,2.5,3.1], 'IdentityDoubleArray', [1,2.5,3.1])
344 self
.assert_method_eq(INTERFACE_TESTS
, ['a','b','c'], 'IdentityStringArray', ['a','b','c'])
345 self
.assert_method_eq(INTERFACE_TESTS
, 6, 'Sum', [Int32(1),Int32(2),Int32(3)])
347 self
.assert_method_eq(INTERFACE_TESTS
, 6, 'Sum', [1,2,3])
349 self
.assert_InvertMapping_eq(INTERFACE_TESTS
, {'fps': ['unreal', 'quake'], 'rts': ['warcraft']}, 'InvertMapping', {'unreal': 'fps', 'quake': 'fps', 'warcraft': 'rts'})
351 self
.assert_method_eq(INTERFACE_TESTS
, ('a', 1, 2), 'DeStruct', ('a', UInt32(1), Int16(2)))
352 self
.assert_method_eq(INTERFACE_TESTS
, Array([String('x', variant_level
=1)]),
353 'Primitize', [String('x', variant_level
=1)])
354 self
.assert_method_eq(INTERFACE_TESTS
, Array([String('x', variant_level
=1)]),
355 'Primitize', [String('x', variant_level
=23)])
356 self
.assert_method_eq(INTERFACE_TESTS
,
357 Array([String('x', variant_level
=1),
358 Byte(1, variant_level
=1),
359 Byte(2, variant_level
=1)]),
361 Array([String('x'), Byte(1), Byte(2)],
363 self
.assert_method_eq(INTERFACE_TESTS
,
364 Array([String('x', variant_level
=1),
365 Byte(1, variant_level
=1),
366 Byte(2, variant_level
=1)]),
368 Array([String('x'), Array([Byte(1), Byte(2)])],
370 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(False), 'Invert', True)
371 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(True), 'Invert', False)
373 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(False), 'Invert', 42)
374 self
.assert_method_eq(INTERFACE_TESTS
, Boolean(True), 'Invert', 0)
377 if __name__
== '__main__':
378 # FIXME: should be possible to export objects without a bus name
380 client
= Client(dbus
.SessionBus(), '/Client')
382 # the Java cross test's interpretation is that the client should be
384 client
= Client(dbus
.SessionBus(), '/Test')
385 gobject
.idle_add(client
.run_client
)
387 loop
= gobject
.MainLoop()
388 logger
.info("running...")
390 logger
.info("main loop exited.")