Update my email address
[wifi-radar.git] / test / unit / pubsub.py
blobca74278a58e6873b18e42d4fd6295bef3d25266a
1 # test.pubsub - tests for publish/subscribe classes
3 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
5 # Copyright (C) 2014 Sean Robinson <robinson@tuxfamily.org>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; version 2 of the License.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License in LICENSE.GPL for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to:
18 # Free Software Foundation, Inc.
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
23 from multiprocessing import Pipe
24 import unittest
26 import mock
28 import wifiradar.pubsub as pubsub
31 class TestLimitedDispatcher(unittest.TestCase):
32 """ The Dispatcher tested here does not process messages. """
34 def setUp(self):
35 self.dispatch = pubsub.Dispatcher(auto_start=False)
37 def test_subscribe(self):
38 """ Test subscribe method. """
39 sub01 = self.dispatch.subscribe()
40 # There should be only one subscriber in the Dispatcher.
41 self.assertEqual(1, len(self.dispatch.pipes))
43 def test_unsubscribe(self):
44 """ Test unsubscribe method. """
45 # Simulate the two ends of a multiprocessing.Pipe.
46 a, b = mock.Mock(), mock.Mock()
48 self.dispatch.pipes[a] = ['TOPIC', 'EXIT']
49 self.dispatch._pairs[b] = a
50 # There should be only one subscriber in the Dispatcher.
51 self.assertEqual(1, len(self.dispatch.pipes))
53 self.dispatch.unsubscribe(b)
54 # There should be no subscribers in the Dispatcher.
55 self.assertEqual(0, len(self.dispatch.pipes))
56 # Check that both ends of the Pipe have been closed.
57 a.close.assert_called_once_with()
58 b.close.assert_called_once_with()
60 def tearDown(self):
61 self.dispatch.close()
64 class TestDispatcher(unittest.TestCase):
65 def setUp(self):
66 self.dispatch = pubsub.Dispatcher()
68 def test_simple_msg(self):
69 """ Test sending and receiving a message. """
70 sub01 = self.dispatch.subscribe('TEST')
71 sub02 = self.dispatch.subscribe('TEST')
72 msg = pubsub.Message('TEST', 'Hello')
73 sub01.send(msg)
74 self.assertEqual(True, sub02.poll(0.25))
75 recv_msg = sub02.recv()
76 self.assertEqual(msg.topic, recv_msg.topic)
77 self.assertEqual(msg.details, recv_msg.details)
78 # Subscribers do not get a copy of their own messages.
79 self.assertEqual(False, sub01.poll(0.25))
81 def test_multi_msg(self):
82 """ Test sending and receiving multiple messages. """
83 subs = list()
84 subs.append(self.dispatch.subscribe('TEST01'))
85 subs.append(self.dispatch.subscribe('TEST02'))
86 subs.append(self.dispatch.subscribe(('TEST01', 'TEST02')))
87 subs.append(self.dispatch.subscribe())
89 msgs = list()
90 msgs.append(pubsub.Message('TEST01', 'Hello'))
91 msgs.append(pubsub.Message('TEST02', 'Hello'))
92 msgs.append(pubsub.Message('TEST03', 'Hello'))
94 publisher = self.dispatch.subscribe()
95 for m in msgs:
96 publisher.send(m)
98 self.assertEqual(True, subs[0].poll(0.25))
99 recv_msg = subs[0].recv()
100 self.assertEqual(msgs[0].topic, recv_msg.topic)
101 self.assertEqual(msgs[0].details, recv_msg.details)
102 # No more messages waiting.
103 self.assertEqual(False, subs[0].poll(0.25))
104 self.assertEqual(True, subs[1].poll(0.25))
105 recv_msg = subs[1].recv()
106 self.assertEqual(msgs[1].topic, recv_msg.topic)
107 self.assertEqual(msgs[1].details, recv_msg.details)
108 # No more messages waiting.
109 self.assertEqual(False, subs[1].poll(0.25))
110 # Check for two messages, one for each subscribed topic.
111 self.assertEqual(True, subs[2].poll(0.25))
112 recv_msg = subs[2].recv()
113 self.assertEqual(msgs[0].topic, recv_msg.topic)
114 self.assertEqual(msgs[0].details, recv_msg.details)
115 self.assertEqual(True, subs[2].poll(0.25))
116 recv_msg = subs[2].recv()
117 self.assertEqual(msgs[1].topic, recv_msg.topic)
118 self.assertEqual(msgs[1].details, recv_msg.details)
119 # No more messages waiting.
120 self.assertEqual(False, subs[2].poll(0.25))
121 # This subscriber should not have received any messages.
122 self.assertEqual(False, subs[3].poll(0.25))
124 # Subscribers do not get a copy of their own messages.
125 self.assertEqual(False, publisher.poll(0.25))
127 def tearDown(self):
128 self.dispatch.close()
131 class TestConnectors(unittest.TestCase):
132 def setUp(self):
133 self.local_dispatch = pubsub.Dispatcher()
134 self.foreign_dispatch = pubsub.Dispatcher()
136 def test_connector(self):
137 """ Test sending through a chain of Dispatchers. """
138 msg = pubsub.Message('TEST', 'Hello')
139 # There should be no subscribers in the Dispatchers.
140 self.assertEqual(0, len(self.local_dispatch.pipes))
141 self.assertEqual(0, len(self.foreign_dispatch.pipes))
142 # Setup a Pipe connecting the two Dispatchers.
143 a, b = Pipe()
144 self.local_dispatch.add_connector(a)
145 self.foreign_dispatch.add_connector(b)
146 # There should be one subscriber in the Dispatchers.
147 self.assertEqual(1, len(self.local_dispatch.pipes))
148 self.assertEqual(1, len(self.foreign_dispatch.pipes))
149 # Subscribe to 'TEST' in both Dispatchers.
150 local_sub = self.local_dispatch.subscribe('TEST')
151 foreign_sub = self.foreign_dispatch.subscribe('TEST')
152 # There should be two subscribers in the Dispatchers.
153 self.assertEqual(2, len(self.local_dispatch.pipes))
154 self.assertEqual(2, len(self.foreign_dispatch.pipes))
155 # Send test message...
156 local_sub.send(msg)
157 # ...receive test message.
158 self.assertEqual(True, foreign_sub.poll(0.25))
159 recv_msg = foreign_sub.recv()
160 self.assertEqual(msg.topic, recv_msg.topic)
161 self.assertEqual(msg.details, recv_msg.details)
162 # Remove link between the two Dispatchers.
163 self.local_dispatch.remove_connector(a)
164 self.foreign_dispatch.remove_connector(b)
165 # Unsubscribe from both Dispatchers.
166 self.local_dispatch.unsubscribe(local_sub)
167 self.foreign_dispatch.unsubscribe(foreign_sub)
168 # There should be no subscribers in the Dispatchers.
169 self.assertEqual(0, len(self.local_dispatch.pipes))
170 self.assertEqual(0, len(self.foreign_dispatch.pipes))
172 def test_bridge(self):
173 """ Test the bridge function. """
174 # There should be no subscribers in the Dispatchers.
175 self.assertEqual(0, len(self.local_dispatch.pipes))
176 self.assertEqual(0, len(self.foreign_dispatch.pipes))
177 # Create the link...
178 local, foreign = pubsub.bridge(self.local_dispatch,
179 self.foreign_dispatch)
180 # There should be one subscriber in the Dispatchers.
181 self.assertEqual(1, len(self.local_dispatch.pipes))
182 self.assertEqual(1, len(self.foreign_dispatch.pipes))
183 # Remove the link.
184 self.local_dispatch.remove_connector(local)
185 self.foreign_dispatch.remove_connector(foreign)
186 # There should be no subscribers in the Dispatchers.
187 self.assertEqual(0, len(self.local_dispatch.pipes))
188 self.assertEqual(0, len(self.foreign_dispatch.pipes))
190 def tearDown(self):
191 self.local_dispatch.close()
192 self.foreign_dispatch.close()