Make Dispatcher launch its own thread for its event loop
[wifi-radar.git] / test / unit / pubsub.py
blob3a8e2a0305789d8fc92abc07fcc62cd2e27b62e1
1 # test.pubsub - tests for publish/subscribe classes
3 # Part of WiFi Radar: A utility for managing WiFi profiles on GNU/Linux.
5 # Copyright (C) 2014 Sean Robinson <seankrobinson@gmail.com>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; version 2 of the License.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License in LICENSE.GPL for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 from multiprocessing import Pipe
23 import unittest
25 import mock
27 import wifiradar.pubsub as pubsub
30 class TestLimitedDispatcher(unittest.TestCase):
31 """ The Dispatcher tested here does not process messages. """
33 def setUp(self):
34 self.dispatch = pubsub.Dispatcher()
36 def test_subscribe(self):
37 """ Test subscribe method. """
38 sub01 = self.dispatch.subscribe()
39 # There should be only one subscriber in the Dispatcher.
40 self.assertEqual(1, len(self.dispatch.pipes))
42 def test_unsubscribe(self):
43 """ Test unsubscribe method. """
44 # Simulate the two ends of a multiprocessing.Pipe.
45 a, b = mock.Mock(), mock.Mock()
47 self.dispatch.pipes[a] = ['TOPIC', 'EXIT']
48 self.dispatch._pairs[b] = a
49 # There should be only one subscriber in the Dispatcher.
50 self.assertEqual(1, len(self.dispatch.pipes))
52 self.dispatch.unsubscribe(b)
53 # There should be no subscribers in the Dispatcher.
54 self.assertEqual(0, len(self.dispatch.pipes))
55 # Check that both ends of the Pipe have been closed.
56 a.close.assert_called_once_with()
57 b.close.assert_called_once_with()
59 def tearDown(self):
60 self.dispatch.close()
63 class TestDispatcher(unittest.TestCase):
64 def setUp(self):
65 self.dispatch = pubsub.Dispatcher()
66 self.dispatch.start()
68 def test_simple_msg(self):
69 """ Test sending and receiving a message. """
70 sub01 = self.dispatch.subscribe('TEST')
71 sub02 = self.dispatch.subscribe('TEST')
72 msg = pubsub.Message('TEST', 'Hello')
73 sub01.send(msg)
74 self.assertEqual(True, sub02.poll(0.25))
75 self.assertEqual(msg, sub02.recv())
76 # Subscribers do not get a copy of their own messages.
77 self.assertEqual(False, sub01.poll(0.25))
79 def test_multi_msg(self):
80 """ Test sending and receiving multiple messages. """
81 subs = list()
82 subs.append(self.dispatch.subscribe('TEST01'))
83 subs.append(self.dispatch.subscribe('TEST02'))
84 subs.append(self.dispatch.subscribe(('TEST01', 'TEST02')))
85 subs.append(self.dispatch.subscribe())
87 msgs = list()
88 msgs.append(pubsub.Message('TEST01', 'Hello'))
89 msgs.append(pubsub.Message('TEST02', 'Hello'))
90 msgs.append(pubsub.Message('TEST03', 'Hello'))
92 publisher = self.dispatch.subscribe()
93 for m in msgs:
94 publisher.send(m)
96 self.assertEqual(True, subs[0].poll(0.25))
97 self.assertEqual(msgs[0], subs[0].recv())
98 # No more messages waiting.
99 self.assertEqual(False, subs[0].poll(0.25))
100 self.assertEqual(True, subs[1].poll(0.25))
101 self.assertEqual(msgs[1], subs[1].recv())
102 # No more messages waiting.
103 self.assertEqual(False, subs[1].poll(0.25))
104 # Check for two messages, one for each subscribed topic.
105 self.assertEqual(True, subs[2].poll(0.25))
106 self.assertEqual(msgs[0], subs[2].recv())
107 self.assertEqual(True, subs[2].poll(0.25))
108 self.assertEqual(msgs[1], subs[2].recv())
109 # No more messages waiting.
110 self.assertEqual(False, subs[2].poll(0.25))
111 # This subscriber should not have received any messages.
112 self.assertEqual(False, subs[3].poll(0.25))
114 # Subscribers do not get a copy of their own messages.
115 self.assertEqual(False, publisher.poll(0.25))
117 def tearDown(self):
118 self.dispatch.close()
119 self.dispatch.join()
122 class TestConnectors(unittest.TestCase):
123 def setUp(self):
124 self.local_dispatch = pubsub.Dispatcher()
125 self.local_dispatch.start()
127 self.foreign_dispatch = pubsub.Dispatcher()
128 self.foreign_dispatch.start()
130 def test_connector(self):
131 """ Test sending through a chain of Dispatchers. """
132 msg = pubsub.Message('TEST', 'Hello')
133 # There should be no subscribers in the Dispatchers.
134 self.assertEqual(0, len(self.local_dispatch.pipes))
135 self.assertEqual(0, len(self.foreign_dispatch.pipes))
136 # Setup a Pipe connecting the two Dispatchers.
137 a, b = Pipe()
138 self.local_dispatch.add_connector(a)
139 self.foreign_dispatch.add_connector(b)
140 # There should be one subscriber in the Dispatchers.
141 self.assertEqual(1, len(self.local_dispatch.pipes))
142 self.assertEqual(1, len(self.foreign_dispatch.pipes))
143 # Subscribe to 'TEST' in both Dispatchers.
144 local_sub = self.local_dispatch.subscribe('TEST')
145 foreign_sub = self.foreign_dispatch.subscribe('TEST')
146 # There should be two subscribers in the Dispatchers.
147 self.assertEqual(2, len(self.local_dispatch.pipes))
148 self.assertEqual(2, len(self.foreign_dispatch.pipes))
149 # Send test message...
150 local_sub.send(msg)
151 # ...receive test message.
152 self.assertEqual(True, foreign_sub.poll(0.25))
153 self.assertEqual(msg, foreign_sub.recv())
154 # Remove link between the two Dispatchers.
155 self.local_dispatch.remove_connector(a)
156 self.foreign_dispatch.remove_connector(b)
157 # Unsubscribe from both Dispatchers.
158 self.local_dispatch.unsubscribe(local_sub)
159 self.foreign_dispatch.unsubscribe(foreign_sub)
160 # There should be no subscribers in the Dispatchers.
161 self.assertEqual(0, len(self.local_dispatch.pipes))
162 self.assertEqual(0, len(self.foreign_dispatch.pipes))
164 def test_bridge(self):
165 """ Test the bridge function. """
166 # There should be no subscribers in the Dispatchers.
167 self.assertEqual(0, len(self.local_dispatch.pipes))
168 self.assertEqual(0, len(self.foreign_dispatch.pipes))
169 # Create the link...
170 local, foreign = pubsub.bridge(self.local_dispatch,
171 self.foreign_dispatch)
172 # There should be one subscriber in the Dispatchers.
173 self.assertEqual(1, len(self.local_dispatch.pipes))
174 self.assertEqual(1, len(self.foreign_dispatch.pipes))
175 # Remove the link.
176 self.local_dispatch.remove_connector(local)
177 self.foreign_dispatch.remove_connector(foreign)
178 # There should be no subscribers in the Dispatchers.
179 self.assertEqual(0, len(self.local_dispatch.pipes))
180 self.assertEqual(0, len(self.foreign_dispatch.pipes))
182 def tearDown(self):
183 self.local_dispatch.close()
184 self.foreign_dispatch.close()
185 self.local_dispatch.join()
186 self.foreign_dispatch.join()