1 # -*- coding: utf-8 -*-
3 # Unix SMB/CIFS implementation.
4 # Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 """Tests for samba.messaging."""
22 from samba
.messaging
import Messaging
23 from samba
.tests
import TestCase
25 from samba
.ndr
import ndr_print
26 from samba
.dcerpc
import server_id
31 class MessagingTests(TestCase
):
33 def get_context(self
, *args
, **kwargs
):
34 kwargs
['lp_ctx'] = samba
.tests
.env_loadparm()
35 return Messaging(*args
, **kwargs
)
37 def test_register(self
):
38 x
= self
.get_context()
42 callback_and_context
= (callback
, None)
43 msg_type
= x
.register(callback_and_context
)
44 self
.addCleanup(x
.deregister
, callback_and_context
, msg_type
)
45 self
.assertTrue(isinstance(msg_type
, int))
47 def test_all_servers(self
):
48 x
= self
.get_context()
49 self
.assertTrue(isinstance(x
.irpc_all_servers(), list))
51 def test_by_name(self
):
52 x
= self
.get_context()
53 for name
in x
.irpc_all_servers():
54 self
.assertTrue(isinstance(x
.irpc_servers_byname(name
.name
), list))
56 def test_unknown_name(self
):
57 x
= self
.get_context()
58 self
.assertRaises(KeyError,
59 x
.irpc_servers_byname
, "samba.messaging test NONEXISTING")
61 def test_assign_server_id(self
):
62 x
= self
.get_context()
63 self
.assertTrue(isinstance(x
.server_id
, server_id
.server_id
))
65 def test_add_remove_name(self
):
66 x
= self
.get_context()
67 name
= "samba.messaging test-%d" % random
.randint(1, 1000000)
69 name_list
= x
.irpc_servers_byname(name
)
70 self
.assertEqual(len(name_list
), 1)
71 self
.assertEqual(ndr_print(x
.server_id
),
72 ndr_print(name_list
[0]))
73 x
.irpc_remove_name(name
)
74 self
.assertRaises(KeyError,
75 x
.irpc_servers_byname
, name
)
77 def test_ping_speed(self
):
78 got_ping
= {"count": 0}
79 got_pong
= {"count": 0}
85 server_ctx
= self
.get_context((0, 1))
87 def ping_callback(got_ping
, msg_type
, src
, data
):
88 got_ping
["count"] += 1
89 server_ctx
.send(src
, msg_pong
, data
)
91 ping_callback_and_context
= (ping_callback
, got_ping
)
92 msg_ping
= server_ctx
.register(ping_callback_and_context
)
93 self
.addCleanup(server_ctx
.deregister
,
94 ping_callback_and_context
,
97 def pong_callback(got_pong
, msg_type
, src
, data
):
98 got_pong
["count"] += 1
100 client_ctx
= self
.get_context((0, 2))
101 pong_callback_and_context
= (pong_callback
, got_pong
)
102 msg_pong
= client_ctx
.register(pong_callback_and_context
)
103 self
.addCleanup(client_ctx
.deregister
,
104 pong_callback_and_context
,
107 # Try both server_id forms (structure and tuple)
108 client_ctx
.send((0, 1), msg_ping
, "testing")
110 client_ctx
.send((0, 1), msg_ping
, "testing2")
112 start_time
= time
.time()
114 # NOTE WELL: If debugging this with GDB, then the timeout will
115 # fire while you are trying to understand it.
117 while (got_ping
["count"] < 2 or got_pong
["count"] < 2) and not timeout
:
118 client_ctx
.loop_once(0.1)
119 server_ctx
.loop_once(0.1)
120 if time
.time() - start_time
> 1:
123 self
.assertEqual(got_ping
["count"], 2)
124 self
.assertEqual(got_pong
["count"], 2)
126 def test_pid_defaulting(self
):
127 got_ping
= {"count": 0}
128 got_pong
= {"count": 0}
135 server_ctx
= self
.get_context((pid
, 1))
137 def ping_callback(got_ping
, msg_type
, src
, data
):
138 got_ping
["count"] += 1
139 server_ctx
.send(src
, msg_pong
, data
)
141 ping_callback_and_context
= (ping_callback
, got_ping
)
142 msg_ping
= server_ctx
.register(ping_callback_and_context
)
143 self
.addCleanup(server_ctx
.deregister
,
144 ping_callback_and_context
,
147 def pong_callback(got_pong
, msg_type
, src
, data
):
148 got_pong
["count"] += 1
150 client_ctx
= self
.get_context((2,))
151 pong_callback_and_context
= (pong_callback
, got_pong
)
152 msg_pong
= client_ctx
.register(pong_callback_and_context
)
153 self
.addCleanup(client_ctx
.deregister
,
154 pong_callback_and_context
,
157 # Try one and two element tuple forms
158 client_ctx
.send((pid
, 1), msg_ping
, "testing")
160 client_ctx
.send((1,), msg_ping
, "testing2")
162 start_time
= time
.time()
164 # NOTE WELL: If debugging this with GDB, then the timeout will
165 # fire while you are trying to understand it.
167 while (got_ping
["count"] < 2 or got_pong
["count"] < 2) and not timeout
:
168 client_ctx
.loop_once(0.1)
169 server_ctx
.loop_once(0.1)
170 if time
.time() - start_time
> 1:
173 self
.assertEqual(got_ping
["count"], 2)
174 self
.assertEqual(got_pong
["count"], 2)