dbus-python.pc.in: Use -I${includedir} to allow libdbus and dbus-python to be in...
[dbus-python-phuang.git] / test / test-service.py
blob8f588c893dcae9a6e408b8fd80b014df3dd92262
1 #!/usr/bin/env python
3 # Copyright (C) 2004 Red Hat Inc. <http://www.redhat.com/>
4 # Copyright (C) 2005, 2006 Collabora Ltd. <http://www.collabora.co.uk/>
6 # Licensed under the Academic Free License version 2.1
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 2 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 import sys
23 import os
24 import logging
26 builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"])
27 pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"])
29 import dbus
31 if not dbus.__file__.startswith(pydir):
32 raise Exception("DBus modules are not being picked up from the package")
34 import dbus.service
35 import dbus.glib
36 import gobject
37 import random
39 from dbus.gobject_service import ExportedGObject
42 logging.basicConfig(filename=builddir + '/test/test-service.log', filemode='w')
43 logging.getLogger().setLevel(1)
44 logger = logging.getLogger('test-service')
47 NAME = "org.freedesktop.DBus.TestSuitePythonService"
48 IFACE = "org.freedesktop.DBus.TestSuiteInterface"
49 OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject"
51 class RemovableObject(dbus.service.Object):
52 # Part of test for https://bugs.freedesktop.org/show_bug.cgi?id=10457
53 def __init__(self, bus_name, object_path=OBJECT + '/RemovableObject'):
54 super(RemovableObject, self).__init__(bus_name, object_path)
56 @dbus.service.method(IFACE, in_signature='', out_signature='b')
57 def IsThere(self):
58 return True
60 @dbus.service.method(IFACE, in_signature='', out_signature='b')
61 def RemoveSelf(self):
62 self.unexport()
63 return True
65 class TestGObject(ExportedGObject):
66 def __init__(self, bus_name, object_path=OBJECT + '/GObject'):
67 super(TestGObject, self).__init__(bus_name, object_path)
69 @dbus.service.method(IFACE)
70 def Echo(self, arg):
71 return arg
73 class TestInterface(dbus.service.Interface):
74 @dbus.service.method(IFACE, in_signature='', out_signature='b')
75 def CheckInheritance(self):
76 return False
78 class TestObject(dbus.service.Object, TestInterface):
79 def __init__(self, bus_name, object_path=OBJECT):
80 dbus.service.Object.__init__(self, bus_name, object_path)
81 self._removables = []
83 """ Echo whatever is sent
84 """
85 @dbus.service.method(IFACE)
86 def Echo(self, arg):
87 return arg
89 @dbus.service.method(IFACE, in_signature='s', out_signature='s')
90 def AcceptUnicodeString(self, foo):
91 assert isinstance(foo, unicode), (foo, foo.__class__.__mro__)
92 return foo
94 @dbus.service.method(IFACE, in_signature='s', out_signature='s', utf8_strings=True)
95 def AcceptUTF8String(self, foo):
96 assert isinstance(foo, str), (foo, foo.__class__.__mro__)
97 return foo
99 @dbus.service.method(IFACE, in_signature='', out_signature='soss',
100 sender_keyword='sender', path_keyword='path',
101 destination_keyword='dest', message_keyword='msg')
102 def MethodExtraInfoKeywords(self, sender=None, path=None, dest=None,
103 msg=None):
104 return (sender, path, dest,
105 msg.__class__.__module__ + '.' + msg.__class__.__name__)
107 @dbus.service.method(IFACE, in_signature='ay', out_signature='ay')
108 def AcceptListOfByte(self, foo):
109 assert isinstance(foo, list), (foo, foo.__class__.__mro__)
110 return foo
112 @dbus.service.method(IFACE, in_signature='ay', out_signature='ay', byte_arrays=True)
113 def AcceptByteArray(self, foo):
114 assert isinstance(foo, str), (foo, foo.__class__.__mro__)
115 return foo
117 @dbus.service.method(IFACE)
118 def GetComplexArray(self):
119 ret = []
120 for i in range(0,100):
121 ret.append((random.randint(0,100), random.randint(0,100), str(random.randint(0,100))))
123 return dbus.Array(ret, signature="(uus)")
125 def returnValue(self, test):
126 if test == 0:
127 return ""
128 elif test == 1:
129 return "",""
130 elif test == 2:
131 return "","",""
132 elif test == 3:
133 return []
134 elif test == 4:
135 return {}
136 elif test == 5:
137 return ["",""]
138 elif test == 6:
139 return ["","",""]
141 @dbus.service.method(IFACE, in_signature='u', out_signature='s')
142 def ReturnOneString(self, test):
143 return self.returnValue(test)
145 @dbus.service.method(IFACE, in_signature='u', out_signature='ss')
146 def ReturnTwoStrings(self, test):
147 return self.returnValue(test)
149 @dbus.service.method(IFACE, in_signature='u', out_signature='(ss)')
150 def ReturnStruct(self, test):
151 logger.info('ReturnStruct(%r) -> %r', test, self.returnValue(test))
152 return self.returnValue(test)
154 @dbus.service.method(IFACE, in_signature='u', out_signature='as')
155 def ReturnArray(self, test):
156 return self.returnValue(test)
158 @dbus.service.method(IFACE, in_signature='u', out_signature='a{ss}')
159 def ReturnDict(self, test):
160 return self.returnValue(test)
162 @dbus.service.signal(IFACE, signature='s')
163 def SignalOneString(self, test):
164 logger.info('SignalOneString(%r)', test)
166 @dbus.service.signal(IFACE, signature='ss')
167 def SignalTwoStrings(self, test, test2):
168 logger.info('SignalTwoStrings(%r, %r)', test, test2)
170 @dbus.service.signal(IFACE, signature='(ss)')
171 def SignalStruct(self, test):
172 logger.info('SignalStruct(%r)', test)
174 @dbus.service.signal(IFACE, signature='as')
175 def SignalArray(self, test):
176 pass
178 @dbus.service.signal(IFACE, signature='a{ss}')
179 def SignalDict(self, test):
180 pass
182 @dbus.service.method(IFACE, in_signature='su', out_signature='')
183 def EmitSignal(self, signal, value):
184 sig = getattr(self, str(signal), None)
185 assert(sig != None)
187 val = self.returnValue(value)
188 # make two string case work by passing arguments in by tuple
189 if (signal == 'SignalTwoStrings' and (value == 1 or value == 5)):
190 val = tuple(val)
191 else:
192 val = tuple([val])
194 logger.info('Emitting %s with %r', signal, val)
195 sig(*val)
197 def CheckInheritance(self):
198 return True
200 @dbus.service.method(IFACE, in_signature='', out_signature='b')
201 def TestListExportedChildObjects(self):
202 objs_root = session_bus.list_exported_child_objects('/')
203 assert objs_root == ['org'], objs_root
204 objs_org = session_bus.list_exported_child_objects('/org')
205 assert objs_org == ['freedesktop'], objs_org
206 return True
208 @dbus.service.method(IFACE, in_signature='bbv', out_signature='v', async_callbacks=('return_cb', 'error_cb'))
209 def AsynchronousMethod(self, async, fail, variant, return_cb, error_cb):
210 try:
211 if async:
212 gobject.timeout_add(500, self.AsynchronousMethod, False, fail, variant, return_cb, error_cb)
213 return
214 else:
215 if fail:
216 raise RuntimeError
217 else:
218 return_cb(variant)
220 return False # do not run again
221 except Exception, e:
222 error_cb(e)
224 @dbus.service.method(IFACE, in_signature='', out_signature='s', sender_keyword='sender')
225 def WhoAmI(self, sender):
226 return sender
228 @dbus.service.method(IFACE, in_signature='', out_signature='b')
229 def AddRemovableObject(self):
230 # Part of test for https://bugs.freedesktop.org/show_bug.cgi?id=10457
231 # Keep the removable object reffed, since that's the use case for this
232 self._removables.append(RemovableObject(global_name))
233 return True
235 @dbus.service.method(IFACE, in_signature='', out_signature='b')
236 def HasRemovableObject(self):
237 # Part of test for https://bugs.freedesktop.org/show_bug.cgi?id=10457
238 objs = session_bus.list_exported_child_objects(OBJECT)
239 return ('RemovableObject' in objs)
241 @dbus.service.method(IFACE)
242 def MultipleReturnWithoutSignature(self):
243 # https://bugs.freedesktop.org/show_bug.cgi?id=10174
244 return dbus.String('abc'), dbus.Int32(123)
246 session_bus = dbus.SessionBus()
247 global_name = dbus.service.BusName(NAME, bus=session_bus)
248 object = TestObject(global_name)
249 g_object = TestGObject(global_name)
250 loop = gobject.MainLoop()
251 loop.run()