8 from dbus
import SessionBus
9 from dbus
.service
import BusName
11 from crosstest
import CROSS_TEST_PATH
, CROSS_TEST_BUS_NAME
, \
12 INTERFACE_SINGLE_TESTS
, INTERFACE_TESTS
,\
13 INTERFACE_CALLBACK_TESTS
, INTERFACE_SIGNAL_TESTS
,\
20 class VerboseSet(Set
):
22 print '<%s> ok' % thing
29 tested_things
= VerboseSet()
31 INTERFACE_SINGLE_TESTS
+ '.Sum',
32 INTERFACE_TESTS
+ '.Identity',
33 INTERFACE_TESTS
+ '.IdentityByte',
34 INTERFACE_TESTS
+ '.IdentityBool',
35 INTERFACE_TESTS
+ '.IdentityInt16',
36 INTERFACE_TESTS
+ '.IdentityUInt16',
37 INTERFACE_TESTS
+ '.IdentityInt32',
38 INTERFACE_TESTS
+ '.IdentityUInt32',
39 INTERFACE_TESTS
+ '.IdentityInt64',
40 INTERFACE_TESTS
+ '.IdentityUInt64',
41 INTERFACE_TESTS
+ '.IdentityDouble',
42 INTERFACE_TESTS
+ '.IdentityString',
43 INTERFACE_TESTS
+ '.IdentityArray',
44 INTERFACE_TESTS
+ '.IdentityByteArray',
45 INTERFACE_TESTS
+ '.IdentityBoolArray',
46 INTERFACE_TESTS
+ '.IdentityInt16Array',
47 INTERFACE_TESTS
+ '.IdentityUInt16Array',
48 INTERFACE_TESTS
+ '.IdentityInt32Array',
49 INTERFACE_TESTS
+ '.IdentityUInt32Array',
50 INTERFACE_TESTS
+ '.IdentityInt64Array',
51 INTERFACE_TESTS
+ '.IdentityUInt64Array',
52 INTERFACE_TESTS
+ '.IdentityDoubleArray',
53 INTERFACE_TESTS
+ '.IdentityStringArray',
54 INTERFACE_TESTS
+ '.Sum',
55 INTERFACE_TESTS
+ '.InvertMapping',
56 INTERFACE_TESTS
+ '.DeStruct',
57 INTERFACE_TESTS
+ '.Primitize',
58 INTERFACE_TESTS
+ '.Trigger',
59 INTERFACE_TESTS
+ '.Exit',
60 INTERFACE_TESTS
+ '.Invert',
61 INTERFACE_SIGNAL_TESTS
+ '.Trigger',
65 class SingleTestsImpl(dbus
.service
.Object
):
67 @dbus.service
.method(INTERFACE_SINGLE_TESTS
, 'ay', 'u')
69 tested_things
.add(INTERFACE_SINGLE_TESTS
+ '.Sum')
70 return sum(map(ord, input))
73 class TestsImpl(dbus
.service
.Object
):
75 def __init__(self
, bus_name
, service_path
, exit_fn
):
76 self
._exit
_fn
= exit_fn
77 dbus
.service
.Object
.__init
__(self
, bus_name
, service_path
)
79 @dbus.service
.method(INTERFACE_TESTS
, 'v', 'v')
80 def Identity(self
, input):
81 tested_things
.add(INTERFACE_TESTS
+ '.Identity')
84 @dbus.service
.method(INTERFACE_TESTS
, 'y', 'y')
85 def IdentityByte(self
, input):
86 tested_things
.add(INTERFACE_TESTS
+ '.IdentityByte')
89 @dbus.service
.method(INTERFACE_TESTS
, 'b', 'b')
90 def IdentityBool(self
, input):
91 tested_things
.add(INTERFACE_TESTS
+ '.IdentityBool')
94 @dbus.service
.method(INTERFACE_TESTS
, 'n', 'n')
95 def IdentityInt16(self
, input):
96 tested_things
.add(INTERFACE_TESTS
+ '.IdentityInt16')
99 @dbus.service
.method(INTERFACE_TESTS
, 'q', 'q')
100 def IdentityUInt16(self
, input):
101 tested_things
.add(INTERFACE_TESTS
+ '.IdentityUInt16')
104 @dbus.service
.method(INTERFACE_TESTS
, 'i', 'i')
105 def IdentityInt32(self
, input):
106 tested_things
.add(INTERFACE_TESTS
+ '.IdentityInt32')
109 @dbus.service
.method(INTERFACE_TESTS
, 'u', 'u')
110 def IdentityUInt32(self
, input):
111 tested_things
.add(INTERFACE_TESTS
+ '.IdentityUInt32')
114 @dbus.service
.method(INTERFACE_TESTS
, 'x', 'x')
115 def IdentityInt64(self
, input):
116 tested_things
.add(INTERFACE_TESTS
+ '.IdentityInt64')
119 @dbus.service
.method(INTERFACE_TESTS
, 't', 't')
120 def IdentityUInt64(self
, input):
121 tested_things
.add(INTERFACE_TESTS
+ '.IdentityUInt64')
124 @dbus.service
.method(INTERFACE_TESTS
, 'd', 'd')
125 def IdentityDouble(self
, input):
126 tested_things
.add(INTERFACE_TESTS
+ '.IdentityDouble')
129 @dbus.service
.method(INTERFACE_TESTS
, 's', 's')
130 def IdentityString(self
, input):
131 tested_things
.add(INTERFACE_TESTS
+ '.IdentityString')
134 @dbus.service
.method(INTERFACE_TESTS
, 'av', 'av')
135 def IdentityArray(self
, input):
136 tested_things
.add(INTERFACE_TESTS
+ '.IdentityArray')
139 @dbus.service
.method(INTERFACE_TESTS
, 'ay', 'ay')
140 def IdentityByteArray(self
, input):
141 tested_things
.add(INTERFACE_TESTS
+ '.IdentityByteArray')
144 @dbus.service
.method(INTERFACE_TESTS
, 'ab', 'ab')
145 def IdentityBoolArray(self
, input):
146 tested_things
.add(INTERFACE_TESTS
+ '.IdentityBoolArray')
149 @dbus.service
.method(INTERFACE_TESTS
, 'an', 'an')
150 def IdentityInt16Array(self
, input):
151 tested_things
.add(INTERFACE_TESTS
+ '.IdentityInt16Array')
154 @dbus.service
.method(INTERFACE_TESTS
, 'aq', 'aq')
155 def IdentityUInt16Array(self
, input):
156 tested_things
.add(INTERFACE_TESTS
+ '.IdentityUInt16Array')
159 @dbus.service
.method(INTERFACE_TESTS
, 'ai', 'ai')
160 def IdentityInt32Array(self
, input):
161 tested_things
.add(INTERFACE_TESTS
+ '.IdentityInt32Array')
164 @dbus.service
.method(INTERFACE_TESTS
, 'au', 'au')
165 def IdentityUInt32Array(self
, input):
166 tested_things
.add(INTERFACE_TESTS
+ '.IdentityUInt32Array')
169 @dbus.service
.method(INTERFACE_TESTS
, 'ax', 'ax')
170 def IdentityInt64Array(self
, input):
171 tested_things
.add(INTERFACE_TESTS
+ '.IdentityInt64Array')
174 @dbus.service
.method(INTERFACE_TESTS
, 'at', 'at')
175 def IdentityUInt64Array(self
, input):
176 tested_things
.add(INTERFACE_TESTS
+ '.IdentityUInt64Array')
179 @dbus.service
.method(INTERFACE_TESTS
, 'ad', 'ad')
180 def IdentityDoubleArray(self
, input):
181 tested_things
.add(INTERFACE_TESTS
+ '.IdentityDoubleArray')
184 @dbus.service
.method(INTERFACE_TESTS
, 'as', 'as')
185 def IdentityStringArray(self
, input):
186 tested_things
.add(INTERFACE_TESTS
+ '.IdentityStringArray')
189 @dbus.service
.method(INTERFACE_TESTS
, 'ai', 'x')
190 def Sum(self
, input):
191 tested_things
.add(INTERFACE_TESTS
+ '.Sum')
194 @dbus.service
.method(INTERFACE_TESTS
, 'a{ss}', 'a{sas}')
195 def InvertMapping(self
, input):
196 tested_things
.add(INTERFACE_TESTS
+ '.InvertMapping')
197 output
= dbus
.Dictionary({})
198 for k
, v
in input.iteritems():
199 output
.setdefault(v
, []).append(k
)
202 @dbus.service
.method(INTERFACE_TESTS
, '(sun)', 'sun')
203 def DeStruct(self
, input):
204 tested_things
.add(INTERFACE_TESTS
+ '.DeStruct')
207 @dbus.service
.method(INTERFACE_TESTS
, 'v', 'av')
208 def Primitize(self
, input):
209 tested_things
.add(INTERFACE_TESTS
+ '.Primitize')
210 return list(self
.primitivize_helper(input))
212 def primitivize_helper(self
, input):
213 if (isinstance(input, tuple) or isinstance(input, dbus
.Struct
)
214 or isinstance(input, list) or isinstance(input, dbus
.Array
)):
216 for y
in self
.primitivize_helper(x
):
218 elif isinstance(input, dbus
.ByteArray
):
220 yield dbus
.Byte(ord(x
))
221 elif isinstance(input, dict) or isinstance(input, dbus
.Dictionary
):
223 for y
in self
.primitivize_helper(x
):
225 for y
in self
.primitivize_helper(input[x
]):
227 elif isinstance(input, dbus
.Variant
):
228 for x
in self
.primitivize_helper(input()):
233 @dbus.service
.method(INTERFACE_TESTS
, 'b', 'b')
234 def Invert(self
, input):
235 tested_things
.add(INTERFACE_TESTS
+ '.Invert')
238 @dbus.service
.method(INTERFACE_TESTS
, 'st', '')
239 def Trigger(self
, object, parameter
):
240 sys
.stderr
.write('SERVER: method/signal: client wants me to emit Triggered(%r) from %r\n' % (parameter
, object))
241 tested_things
.add(INTERFACE_TESTS
+ '.Trigger')
242 gobject
.idle_add(lambda: self
.emit_Triggered_from(object, parameter
))
244 def emit_Triggered_from(self
, object, parameter
):
245 sys
.stderr
.write('SERVER: method/signal: Emitting Triggered(%r) from %r\n' % (parameter
, object))
246 objects
.setdefault(object, SignalTestsImpl(dbus
.service
.BusName(CROSS_TEST_BUS_NAME
), object)).Triggered(parameter
)
247 sys
.stderr
.write('SERVER: method/signal: Emitted Triggered\n')
249 @dbus.service
.method(INTERFACE_TESTS
, '', '')
251 sys
.stderr
.write('SERVER: client wants me to Exit\n')
252 tested_things
.add(INTERFACE_TESTS
+ '.Exit')
253 for x
in testable_things
:
254 if x
not in tested_things
:
255 print '<%s> untested' % x
256 sys
.stderr
.write('SERVER: will quit when idle\n')
257 gobject
.idle_add(self
._exit
_fn
)
260 class Server(SingleTestsImpl
, TestsImpl
, SignalTestsImpl
):
262 def triggered_by_client(self
, parameter1
, parameter2
, sender
, sender_path
):
263 # Called when the client emits TestSignals.Trigger from any object.
264 sys
.stderr
.write('SERVER: signal/callback: Triggered by client (%s:%s): (%r,%r)\n' % (sender
, sender_path
, parameter1
, parameter2
))
265 tested_things
.add(INTERFACE_SIGNAL_TESTS
+ '.Trigger')
266 dbus
.Interface(dbus
.SessionBus().get_object(sender
, sender_path
),
267 INTERFACE_CALLBACK_TESTS
).Response(parameter1
, parameter2
)
268 sys
.stderr
.write('SERVER: signal/callback: Sent Response\n')
272 if __name__
== '__main__':
274 bus_name
= BusName(CROSS_TEST_BUS_NAME
)
275 loop
= gobject
.MainLoop()
276 obj
= Server(bus_name
, CROSS_TEST_PATH
, loop
.quit
)
277 bus
.add_signal_receiver(obj
.triggered_by_client
,
278 signal_name
='Trigger',
279 dbus_interface
=INTERFACE_SIGNAL_TESTS
,
282 sender_keyword
='sender',
283 path_keyword
='sender_path')
284 objects
[CROSS_TEST_PATH
] = obj
286 sys
.stderr
.write("SERVER: running...\n")
288 sys
.stderr
.write("SERVER: main loop exited.\n")