s3:smbd: fix NULL dereference in case of readlink failure
[Samba.git] / python / samba / tests / messaging.py
blob0cadd0d6bf10d72dd93d7a9c7f0f6e73c28d7445
1 # -*- coding: utf-8 -*-
3 # Unix SMB/CIFS implementation.
4 # Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Tests for samba.messaging."""
21 import samba
22 from samba.messaging import Messaging
23 from samba.tests import TestCase
24 import time
25 from samba.ndr import ndr_print
26 from samba.dcerpc import server_id
27 import random
28 import os
31 class MessagingTests(TestCase):
33 def get_context(self, *args, **kwargs):
34 kwargs['lp_ctx'] = samba.tests.env_loadparm()
35 return Messaging(*args, **kwargs)
37 def test_register(self):
38 x = self.get_context()
40 def callback():
41 pass
42 callback_and_context = (callback, None)
43 msg_type = x.register(callback_and_context)
44 self.addCleanup(x.deregister, callback_and_context, msg_type)
45 self.assertTrue(isinstance(msg_type, int))
47 def test_all_servers(self):
48 x = self.get_context()
49 self.assertTrue(isinstance(x.irpc_all_servers(), list))
51 def test_by_name(self):
52 x = self.get_context()
53 for name in x.irpc_all_servers():
54 self.assertTrue(isinstance(x.irpc_servers_byname(name.name), list))
56 def test_unknown_name(self):
57 x = self.get_context()
58 self.assertRaises(KeyError,
59 x.irpc_servers_byname, "samba.messaging test NONEXISTING")
61 def test_assign_server_id(self):
62 x = self.get_context()
63 self.assertTrue(isinstance(x.server_id, server_id.server_id))
65 def test_add_remove_name(self):
66 x = self.get_context()
67 name = "samba.messaging test-%d" % random.randint(1, 1000000)
68 x.irpc_add_name(name)
69 name_list = x.irpc_servers_byname(name)
70 self.assertEqual(len(name_list), 1)
71 self.assertEqual(ndr_print(x.server_id),
72 ndr_print(name_list[0]))
73 x.irpc_remove_name(name)
74 self.assertRaises(KeyError,
75 x.irpc_servers_byname, name)
77 def test_ping_speed(self):
78 got_ping = {"count": 0}
79 got_pong = {"count": 0}
80 timeout = False
82 msg_pong = 0
83 msg_ping = 0
85 server_ctx = self.get_context((0, 1))
87 def ping_callback(got_ping, msg_type, src, data):
88 got_ping["count"] += 1
89 server_ctx.send(src, msg_pong, data)
91 ping_callback_and_context = (ping_callback, got_ping)
92 msg_ping = server_ctx.register(ping_callback_and_context)
93 self.addCleanup(server_ctx.deregister,
94 ping_callback_and_context,
95 msg_ping)
97 def pong_callback(got_pong, msg_type, src, data):
98 got_pong["count"] += 1
100 client_ctx = self.get_context((0, 2))
101 pong_callback_and_context = (pong_callback, got_pong)
102 msg_pong = client_ctx.register(pong_callback_and_context)
103 self.addCleanup(client_ctx.deregister,
104 pong_callback_and_context,
105 msg_pong)
107 # Try both server_id forms (structure and tuple)
108 client_ctx.send((0, 1), msg_ping, "testing")
110 client_ctx.send((0, 1), msg_ping, "testing2")
112 start_time = time.time()
114 # NOTE WELL: If debugging this with GDB, then the timeout will
115 # fire while you are trying to understand it.
117 while (got_ping["count"] < 2 or got_pong["count"] < 2) and not timeout:
118 client_ctx.loop_once(0.1)
119 server_ctx.loop_once(0.1)
120 if time.time() - start_time > 1:
121 timeout = True
123 self.assertEqual(got_ping["count"], 2)
124 self.assertEqual(got_pong["count"], 2)
126 def test_pid_defaulting(self):
127 got_ping = {"count": 0}
128 got_pong = {"count": 0}
129 timeout = False
131 msg_pong = 0
132 msg_ping = 0
134 pid = os.getpid()
135 server_ctx = self.get_context((pid, 1))
137 def ping_callback(got_ping, msg_type, src, data):
138 got_ping["count"] += 1
139 server_ctx.send(src, msg_pong, data)
141 ping_callback_and_context = (ping_callback, got_ping)
142 msg_ping = server_ctx.register(ping_callback_and_context)
143 self.addCleanup(server_ctx.deregister,
144 ping_callback_and_context,
145 msg_ping)
147 def pong_callback(got_pong, msg_type, src, data):
148 got_pong["count"] += 1
150 client_ctx = self.get_context((2,))
151 pong_callback_and_context = (pong_callback, got_pong)
152 msg_pong = client_ctx.register(pong_callback_and_context)
153 self.addCleanup(client_ctx.deregister,
154 pong_callback_and_context,
155 msg_pong)
157 # Try one and two element tuple forms
158 client_ctx.send((pid, 1), msg_ping, "testing")
160 client_ctx.send((1,), msg_ping, "testing2")
162 start_time = time.time()
164 # NOTE WELL: If debugging this with GDB, then the timeout will
165 # fire while you are trying to understand it.
167 while (got_ping["count"] < 2 or got_pong["count"] < 2) and not timeout:
168 client_ctx.loop_once(0.1)
169 server_ctx.loop_once(0.1)
170 if time.time() - start_time > 1:
171 timeout = True
173 self.assertEqual(got_ping["count"], 2)
174 self.assertEqual(got_pong["count"], 2)