From 77f19ef18864f3468b7373bd75461aad3239fe52 Mon Sep 17 00:00:00 2001 From: Simon McVittie Date: Mon, 30 Apr 2007 13:38:50 +0100 Subject: [PATCH] test/test-p2p.py: Added. Test "peer-to-peer" connections. (Actually tested by connecting to the bus daemon, because I haven't implemented a Python binding for DBusServer yet.) --- test/Makefile.am | 1 + test/run-test.sh | 3 ++ test/test-p2p.py | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+) create mode 100644 test/test-p2p.py diff --git a/test/Makefile.am b/test/Makefile.am index 58e4da8..ef2d529 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -5,6 +5,7 @@ EXTRA_DIST = \ run-test.sh \ run-with-tmp-session-bus.sh \ test-client.py \ + test-p2p.py \ test-service.py \ test-signals.py \ test-standalone.py \ diff --git a/test/run-test.sh b/test/run-test.sh index 6058afa..2661d3b 100755 --- a/test/run-test.sh +++ b/test/run-test.sh @@ -89,6 +89,9 @@ $PYTHON "$DBUS_TOP_SRCDIR"/test/test-client.py || die "test-client.py failed" echo "running test-signals.py" $PYTHON "$DBUS_TOP_SRCDIR"/test/test-signals.py || die "test-signals.py failed" +echo "running test-p2p.py" +$PYTHON "$DBUS_TOP_SRCDIR"/test/test-p2p.py || die "... failed" + rm -f "$DBUS_TOP_BUILDDIR"/test/test-service.log rm -f "$DBUS_TOP_BUILDDIR"/test/cross-client.log rm -f "$DBUS_TOP_BUILDDIR"/test/cross-server.log diff --git a/test/test-p2p.py b/test/test-p2p.py new file mode 100644 index 0000000..11b44e8 --- /dev/null +++ b/test/test-p2p.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python + +# Copyright (C) 2004 Red Hat Inc. +# Copyright (C) 2005, 2006 Collabora Ltd. +# +# Licensed under the Academic Free License version 2.1 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +import sys +import os +import unittest +import time +import logging +import weakref + +builddir = os.path.normpath(os.environ["DBUS_TOP_BUILDDIR"]) +pydir = os.path.normpath(os.environ["DBUS_TOP_SRCDIR"]) + +import dbus +import _dbus_bindings +import gobject +import dbus.glib +import dbus.service + + +logging.basicConfig() +logging.getLogger().setLevel(1) + + +NAME = "org.freedesktop.DBus.TestSuitePythonService" +IFACE = "org.freedesktop.DBus.TestSuiteInterface" +OBJECT = "/org/freedesktop/DBus/TestSuitePythonObject" + +class TestDBusBindings(unittest.TestCase): + # This test case relies on the test service already having been activated. + + def get_conn_and_unique(self): + # since I haven't implemented servers yet, I'll use the bus daemon + # as our peer - note that there's no name tracking because we're not + # using dbus.bus.BusConnection! + conn = dbus.connection.Connection( + os.environ['DBUS_SESSION_BUS_ADDRESS']) + unique = conn.call_blocking('org.freedesktop.DBus', + '/org/freedesktop/DBus', + 'org.freedesktop.DBus', 'Hello', + '', (), utf8_strings=True) + self.assert_(unique.__class__ == dbus.UTF8String, repr(unique)) + self.assert_(unique.startswith(':'), unique) + conn.set_unique_name(unique) + return conn, unique + + def testCall(self): + conn, unique = self.get_conn_and_unique() + ret = conn.call_blocking(NAME, OBJECT, IFACE, 'Echo', 'v', ('V',)) + self.assertEquals(ret, 'V') + + def testCallThroughProxy(self): + conn, unique = self.get_conn_and_unique() + proxy = conn.get_object(NAME, OBJECT) + iface = dbus.Interface(proxy, IFACE) + ret = iface.Echo('V') + self.assertEquals(ret, 'V') + + def testSetUniqueName(self): + conn, unique = self.get_conn_and_unique() + ret = conn.call_blocking(NAME, OBJECT, IFACE, + 'MethodExtraInfoKeywords', '', (), + utf8_strings=True) + self.assertEquals(ret, (unique, OBJECT, NAME, + 'dbus.lowlevel.MethodCallMessage')) + + +if __name__ == '__main__': + gobject.threads_init() + dbus.glib.init_threads() + + unittest.main() -- 2.11.4.GIT