Bundled cherrypy.
[smonitor.git] / monitor / cherrypy / test / test_bus.py
blob51c102201aeca494550879b894356f3db665f3e6
1 import threading
2 import time
3 import unittest
5 import cherrypy
6 from cherrypy._cpcompat import get_daemon, set
7 from cherrypy.process import wspbus
10 msg = "Listener %d on channel %s: %s."
13 class PublishSubscribeTests(unittest.TestCase):
15 def get_listener(self, channel, index):
16 def listener(arg=None):
17 self.responses.append(msg % (index, channel, arg))
18 return listener
20 def test_builtin_channels(self):
21 b = wspbus.Bus()
23 self.responses, expected = [], []
25 for channel in b.listeners:
26 for index, priority in enumerate([100, 50, 0, 51]):
27 b.subscribe(channel, self.get_listener(channel, index), priority)
29 for channel in b.listeners:
30 b.publish(channel)
31 expected.extend([msg % (i, channel, None) for i in (2, 1, 3, 0)])
32 b.publish(channel, arg=79347)
33 expected.extend([msg % (i, channel, 79347) for i in (2, 1, 3, 0)])
35 self.assertEqual(self.responses, expected)
37 def test_custom_channels(self):
38 b = wspbus.Bus()
40 self.responses, expected = [], []
42 custom_listeners = ('hugh', 'louis', 'dewey')
43 for channel in custom_listeners:
44 for index, priority in enumerate([None, 10, 60, 40]):
45 b.subscribe(channel, self.get_listener(channel, index), priority)
47 for channel in custom_listeners:
48 b.publish(channel, 'ah so')
49 expected.extend([msg % (i, channel, 'ah so') for i in (1, 3, 0, 2)])
50 b.publish(channel)
51 expected.extend([msg % (i, channel, None) for i in (1, 3, 0, 2)])
53 self.assertEqual(self.responses, expected)
55 def test_listener_errors(self):
56 b = wspbus.Bus()
58 self.responses, expected = [], []
59 channels = [c for c in b.listeners if c != 'log']
61 for channel in channels:
62 b.subscribe(channel, self.get_listener(channel, 1))
63 # This will break since the lambda takes no args.
64 b.subscribe(channel, lambda: None, priority=20)
66 for channel in channels:
67 self.assertRaises(wspbus.ChannelFailures, b.publish, channel, 123)
68 expected.append(msg % (1, channel, 123))
70 self.assertEqual(self.responses, expected)
73 class BusMethodTests(unittest.TestCase):
75 def log(self, bus):
76 self._log_entries = []
77 def logit(msg, level):
78 self._log_entries.append(msg)
79 bus.subscribe('log', logit)
81 def assertLog(self, entries):
82 self.assertEqual(self._log_entries, entries)
84 def get_listener(self, channel, index):
85 def listener(arg=None):
86 self.responses.append(msg % (index, channel, arg))
87 return listener
89 def test_start(self):
90 b = wspbus.Bus()
91 self.log(b)
93 self.responses = []
94 num = 3
95 for index in range(num):
96 b.subscribe('start', self.get_listener('start', index))
98 b.start()
99 try:
100 # The start method MUST call all 'start' listeners.
101 self.assertEqual(set(self.responses),
102 set([msg % (i, 'start', None) for i in range(num)]))
103 # The start method MUST move the state to STARTED
104 # (or EXITING, if errors occur)
105 self.assertEqual(b.state, b.states.STARTED)
106 # The start method MUST log its states.
107 self.assertLog(['Bus STARTING', 'Bus STARTED'])
108 finally:
109 # Exit so the atexit handler doesn't complain.
110 b.exit()
112 def test_stop(self):
113 b = wspbus.Bus()
114 self.log(b)
116 self.responses = []
117 num = 3
118 for index in range(num):
119 b.subscribe('stop', self.get_listener('stop', index))
121 b.stop()
123 # The stop method MUST call all 'stop' listeners.
124 self.assertEqual(set(self.responses),
125 set([msg % (i, 'stop', None) for i in range(num)]))
126 # The stop method MUST move the state to STOPPED
127 self.assertEqual(b.state, b.states.STOPPED)
128 # The stop method MUST log its states.
129 self.assertLog(['Bus STOPPING', 'Bus STOPPED'])
131 def test_graceful(self):
132 b = wspbus.Bus()
133 self.log(b)
135 self.responses = []
136 num = 3
137 for index in range(num):
138 b.subscribe('graceful', self.get_listener('graceful', index))
140 b.graceful()
142 # The graceful method MUST call all 'graceful' listeners.
143 self.assertEqual(set(self.responses),
144 set([msg % (i, 'graceful', None) for i in range(num)]))
145 # The graceful method MUST log its states.
146 self.assertLog(['Bus graceful'])
148 def test_exit(self):
149 b = wspbus.Bus()
150 self.log(b)
152 self.responses = []
153 num = 3
154 for index in range(num):
155 b.subscribe('stop', self.get_listener('stop', index))
156 b.subscribe('exit', self.get_listener('exit', index))
158 b.exit()
160 # The exit method MUST call all 'stop' listeners,
161 # and then all 'exit' listeners.
162 self.assertEqual(set(self.responses),
163 set([msg % (i, 'stop', None) for i in range(num)] +
164 [msg % (i, 'exit', None) for i in range(num)]))
165 # The exit method MUST move the state to EXITING
166 self.assertEqual(b.state, b.states.EXITING)
167 # The exit method MUST log its states.
168 self.assertLog(['Bus STOPPING', 'Bus STOPPED', 'Bus EXITING', 'Bus EXITED'])
170 def test_wait(self):
171 b = wspbus.Bus()
173 def f(method):
174 time.sleep(0.2)
175 getattr(b, method)()
177 for method, states in [('start', [b.states.STARTED]),
178 ('stop', [b.states.STOPPED]),
179 ('start', [b.states.STARTING, b.states.STARTED]),
180 ('exit', [b.states.EXITING]),
182 threading.Thread(target=f, args=(method,)).start()
183 b.wait(states)
185 # The wait method MUST wait for the given state(s).
186 if b.state not in states:
187 self.fail("State %r not in %r" % (b.state, states))
189 def test_block(self):
190 b = wspbus.Bus()
191 self.log(b)
193 def f():
194 time.sleep(0.2)
195 b.exit()
196 def g():
197 time.sleep(0.4)
198 threading.Thread(target=f).start()
199 threading.Thread(target=g).start()
200 threads = [t for t in threading.enumerate() if not get_daemon(t)]
201 self.assertEqual(len(threads), 3)
203 b.block()
205 # The block method MUST wait for the EXITING state.
206 self.assertEqual(b.state, b.states.EXITING)
207 # The block method MUST wait for ALL non-main, non-daemon threads to finish.
208 threads = [t for t in threading.enumerate() if not get_daemon(t)]
209 self.assertEqual(len(threads), 1)
210 # The last message will mention an indeterminable thread name; ignore it
211 self.assertEqual(self._log_entries[:-1],
212 ['Bus STOPPING', 'Bus STOPPED',
213 'Bus EXITING', 'Bus EXITED',
214 'Waiting for child threads to terminate...'])
216 def test_start_with_callback(self):
217 b = wspbus.Bus()
218 self.log(b)
219 try:
220 events = []
221 def f(*args, **kwargs):
222 events.append(("f", args, kwargs))
223 def g():
224 events.append("g")
225 b.subscribe("start", g)
226 b.start_with_callback(f, (1, 3, 5), {"foo": "bar"})
227 # Give wait() time to run f()
228 time.sleep(0.2)
230 # The callback method MUST wait for the STARTED state.
231 self.assertEqual(b.state, b.states.STARTED)
232 # The callback method MUST run after all start methods.
233 self.assertEqual(events, ["g", ("f", (1, 3, 5), {"foo": "bar"})])
234 finally:
235 b.exit()
237 def test_log(self):
238 b = wspbus.Bus()
239 self.log(b)
240 self.assertLog([])
242 # Try a normal message.
243 expected = []
244 for msg in ["O mah darlin'"] * 3 + ["Clementiiiiiiiine"]:
245 b.log(msg)
246 expected.append(msg)
247 self.assertLog(expected)
249 # Try an error message
250 try:
252 except NameError:
253 b.log("You are lost and gone forever", traceback=True)
254 lastmsg = self._log_entries[-1]
255 if "Traceback" not in lastmsg or "NameError" not in lastmsg:
256 self.fail("Last log message %r did not contain "
257 "the expected traceback." % lastmsg)
258 else:
259 self.fail("NameError was not raised as expected.")
262 if __name__ == "__main__":
263 unittest.main()