2 # -*- coding: utf-8 -*-
4 # Unix SMB/CIFS implementation.
5 # Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba
.messaging
import Messaging
22 from unittest
import TestCase
24 class MessagingTests(TestCase
):
25 def get_context(self
, *args
, **kwargs
):
26 kwargs
["messaging_path"] = "."
27 return Messaging(*args
, **kwargs
)
29 def test_register(self
):
30 x
= self
.get_context()
33 msg_type
= x
.register(callback
)
34 x
.deregister(callback
, msg_type
)
36 def test_assign_server_id(self
):
37 x
= self
.get_context()
38 self
.assertTrue(isinstance(x
.server_id
, tuple))
39 self
.assertEquals(3, len(x
.server_id
))
41 def test_ping_speed(self
):
42 server_ctx
= self
.get_context((0, 1))
43 def ping_callback(src
, data
):
44 server_ctx
.send(src
, data
)
47 msg_ping
= server_ctx
.register(ping_callback
)
48 msg_exit
= server_ctx
.register(exit_callback
)
52 client_ctx
= self
.get_context((0, 2))
53 msg_pong
= client_ctx
.register(pong_callback
)
55 client_ctx
.send((0, 1), msg_ping
, "testing")
56 client_ctx
.send((0, 1), msg_ping
, "")