Bundled cherrypy.
[smonitor.git] / monitor / cherrypy / test / test_states.py
blob0f97337485c61a024865ece6cf294b8c6dae47ee
1 from cherrypy._cpcompat import BadStatusLine, ntob
2 import os
3 import sys
4 import threading
5 import time
7 import cherrypy
8 engine = cherrypy.engine
9 thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
12 class Dependency:
14 def __init__(self, bus):
15 self.bus = bus
16 self.running = False
17 self.startcount = 0
18 self.gracecount = 0
19 self.threads = {}
21 def subscribe(self):
22 self.bus.subscribe('start', self.start)
23 self.bus.subscribe('stop', self.stop)
24 self.bus.subscribe('graceful', self.graceful)
25 self.bus.subscribe('start_thread', self.startthread)
26 self.bus.subscribe('stop_thread', self.stopthread)
28 def start(self):
29 self.running = True
30 self.startcount += 1
32 def stop(self):
33 self.running = False
35 def graceful(self):
36 self.gracecount += 1
38 def startthread(self, thread_id):
39 self.threads[thread_id] = None
41 def stopthread(self, thread_id):
42 del self.threads[thread_id]
44 db_connection = Dependency(engine)
46 def setup_server():
47 class Root:
48 def index(self):
49 return "Hello World"
50 index.exposed = True
52 def ctrlc(self):
53 raise KeyboardInterrupt()
54 ctrlc.exposed = True
56 def graceful(self):
57 engine.graceful()
58 return "app was (gracefully) restarted succesfully"
59 graceful.exposed = True
61 def block_explicit(self):
62 while True:
63 if cherrypy.response.timed_out:
64 cherrypy.response.timed_out = False
65 return "broken!"
66 time.sleep(0.01)
67 block_explicit.exposed = True
69 def block_implicit(self):
70 time.sleep(0.5)
71 return "response.timeout = %s" % cherrypy.response.timeout
72 block_implicit.exposed = True
74 cherrypy.tree.mount(Root())
75 cherrypy.config.update({
76 'environment': 'test_suite',
77 'engine.deadlock_poll_freq': 0.1,
80 db_connection.subscribe()
84 # ------------ Enough helpers. Time for real live test cases. ------------ #
87 from cherrypy.test import helper
89 class ServerStateTests(helper.CPWebCase):
90 setup_server = staticmethod(setup_server)
92 def setUp(self):
93 cherrypy.server.socket_timeout = 0.1
95 def test_0_NormalStateFlow(self):
96 engine.stop()
97 # Our db_connection should not be running
98 self.assertEqual(db_connection.running, False)
99 self.assertEqual(db_connection.startcount, 1)
100 self.assertEqual(len(db_connection.threads), 0)
102 # Test server start
103 engine.start()
104 self.assertEqual(engine.state, engine.states.STARTED)
106 host = cherrypy.server.socket_host
107 port = cherrypy.server.socket_port
108 self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port)
110 # The db_connection should be running now
111 self.assertEqual(db_connection.running, True)
112 self.assertEqual(db_connection.startcount, 2)
113 self.assertEqual(len(db_connection.threads), 0)
115 self.getPage("/")
116 self.assertBody("Hello World")
117 self.assertEqual(len(db_connection.threads), 1)
119 # Test engine stop. This will also stop the HTTP server.
120 engine.stop()
121 self.assertEqual(engine.state, engine.states.STOPPED)
123 # Verify that our custom stop function was called
124 self.assertEqual(db_connection.running, False)
125 self.assertEqual(len(db_connection.threads), 0)
127 # Block the main thread now and verify that exit() works.
128 def exittest():
129 self.getPage("/")
130 self.assertBody("Hello World")
131 engine.exit()
132 cherrypy.server.start()
133 engine.start_with_callback(exittest)
134 engine.block()
135 self.assertEqual(engine.state, engine.states.EXITING)
137 def test_1_Restart(self):
138 cherrypy.server.start()
139 engine.start()
141 # The db_connection should be running now
142 self.assertEqual(db_connection.running, True)
143 grace = db_connection.gracecount
145 self.getPage("/")
146 self.assertBody("Hello World")
147 self.assertEqual(len(db_connection.threads), 1)
149 # Test server restart from this thread
150 engine.graceful()
151 self.assertEqual(engine.state, engine.states.STARTED)
152 self.getPage("/")
153 self.assertBody("Hello World")
154 self.assertEqual(db_connection.running, True)
155 self.assertEqual(db_connection.gracecount, grace + 1)
156 self.assertEqual(len(db_connection.threads), 1)
158 # Test server restart from inside a page handler
159 self.getPage("/graceful")
160 self.assertEqual(engine.state, engine.states.STARTED)
161 self.assertBody("app was (gracefully) restarted succesfully")
162 self.assertEqual(db_connection.running, True)
163 self.assertEqual(db_connection.gracecount, grace + 2)
164 # Since we are requesting synchronously, is only one thread used?
165 # Note that the "/graceful" request has been flushed.
166 self.assertEqual(len(db_connection.threads), 0)
168 engine.stop()
169 self.assertEqual(engine.state, engine.states.STOPPED)
170 self.assertEqual(db_connection.running, False)
171 self.assertEqual(len(db_connection.threads), 0)
173 def test_2_KeyboardInterrupt(self):
174 # Raise a keyboard interrupt in the HTTP server's main thread.
175 # We must start the server in this, the main thread
176 engine.start()
177 cherrypy.server.start()
179 self.persistent = True
180 try:
181 # Make the first request and assert there's no "Connection: close".
182 self.getPage("/")
183 self.assertStatus('200 OK')
184 self.assertBody("Hello World")
185 self.assertNoHeader("Connection")
187 cherrypy.server.httpserver.interrupt = KeyboardInterrupt
188 engine.block()
190 self.assertEqual(db_connection.running, False)
191 self.assertEqual(len(db_connection.threads), 0)
192 self.assertEqual(engine.state, engine.states.EXITING)
193 finally:
194 self.persistent = False
196 # Raise a keyboard interrupt in a page handler; on multithreaded
197 # servers, this should occur in one of the worker threads.
198 # This should raise a BadStatusLine error, since the worker
199 # thread will just die without writing a response.
200 engine.start()
201 cherrypy.server.start()
203 try:
204 self.getPage("/ctrlc")
205 except BadStatusLine:
206 pass
207 else:
208 print(self.body)
209 self.fail("AssertionError: BadStatusLine not raised")
211 engine.block()
212 self.assertEqual(db_connection.running, False)
213 self.assertEqual(len(db_connection.threads), 0)
215 def test_3_Deadlocks(self):
216 cherrypy.config.update({'response.timeout': 0.2})
218 engine.start()
219 cherrypy.server.start()
220 try:
221 self.assertNotEqual(engine.timeout_monitor.thread, None)
223 # Request a "normal" page.
224 self.assertEqual(engine.timeout_monitor.servings, [])
225 self.getPage("/")
226 self.assertBody("Hello World")
227 # request.close is called async.
228 while engine.timeout_monitor.servings:
229 sys.stdout.write(".")
230 time.sleep(0.01)
232 # Request a page that explicitly checks itself for deadlock.
233 # The deadlock_timeout should be 2 secs.
234 self.getPage("/block_explicit")
235 self.assertBody("broken!")
237 # Request a page that implicitly breaks deadlock.
238 # If we deadlock, we want to touch as little code as possible,
239 # so we won't even call handle_error, just bail ASAP.
240 self.getPage("/block_implicit")
241 self.assertStatus(500)
242 self.assertInBody("raise cherrypy.TimeoutError()")
243 finally:
244 engine.exit()
246 def test_4_Autoreload(self):
247 # Start the demo script in a new process
248 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
249 p.write_conf(
250 extra='test_case_name: "test_4_Autoreload"')
251 p.start(imports='cherrypy.test._test_states_demo')
252 try:
253 self.getPage("/start")
254 start = float(self.body)
256 # Give the autoreloader time to cache the file time.
257 time.sleep(2)
259 # Touch the file
260 os.utime(os.path.join(thisdir, "_test_states_demo.py"), None)
262 # Give the autoreloader time to re-exec the process
263 time.sleep(2)
264 host = cherrypy.server.socket_host
265 port = cherrypy.server.socket_port
266 cherrypy._cpserver.wait_for_occupied_port(host, port)
268 self.getPage("/start")
269 self.assert_(float(self.body) > start)
270 finally:
271 # Shut down the spawned process
272 self.getPage("/exit")
273 p.join()
275 def test_5_Start_Error(self):
276 # If a process errors during start, it should stop the engine
277 # and exit with a non-zero exit code.
278 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
279 wait=True)
280 p.write_conf(
281 extra="""starterror: True
282 test_case_name: "test_5_Start_Error"
285 p.start(imports='cherrypy.test._test_states_demo')
286 if p.exit_code == 0:
287 self.fail("Process failed to return nonzero exit code.")
290 class PluginTests(helper.CPWebCase):
291 def test_daemonize(self):
292 if os.name not in ['posix']:
293 return self.skip("skipped (not on posix) ")
294 self.HOST = '127.0.0.1'
295 self.PORT = 8081
296 # Spawn the process and wait, when this returns, the original process
297 # is finished. If it daemonized properly, we should still be able
298 # to access pages.
299 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
300 wait=True, daemonize=True,
301 socket_host='127.0.0.1',
302 socket_port=8081)
303 p.write_conf(
304 extra='test_case_name: "test_daemonize"')
305 p.start(imports='cherrypy.test._test_states_demo')
306 try:
307 # Just get the pid of the daemonization process.
308 self.getPage("/pid")
309 self.assertStatus(200)
310 page_pid = int(self.body)
311 self.assertEqual(page_pid, p.get_pid())
312 finally:
313 # Shut down the spawned process
314 self.getPage("/exit")
315 p.join()
317 # Wait until here to test the exit code because we want to ensure
318 # that we wait for the daemon to finish running before we fail.
319 if p.exit_code != 0:
320 self.fail("Daemonized parent process failed to exit cleanly.")
323 class SignalHandlingTests(helper.CPWebCase):
324 def test_SIGHUP_tty(self):
325 # When not daemonized, SIGHUP should shut down the server.
326 try:
327 from signal import SIGHUP
328 except ImportError:
329 return self.skip("skipped (no SIGHUP) ")
331 # Spawn the process.
332 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
333 p.write_conf(
334 extra='test_case_name: "test_SIGHUP_tty"')
335 p.start(imports='cherrypy.test._test_states_demo')
336 # Send a SIGHUP
337 os.kill(p.get_pid(), SIGHUP)
338 # This might hang if things aren't working right, but meh.
339 p.join()
341 def test_SIGHUP_daemonized(self):
342 # When daemonized, SIGHUP should restart the server.
343 try:
344 from signal import SIGHUP
345 except ImportError:
346 return self.skip("skipped (no SIGHUP) ")
348 if os.name not in ['posix']:
349 return self.skip("skipped (not on posix) ")
351 # Spawn the process and wait, when this returns, the original process
352 # is finished. If it daemonized properly, we should still be able
353 # to access pages.
354 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
355 wait=True, daemonize=True)
356 p.write_conf(
357 extra='test_case_name: "test_SIGHUP_daemonized"')
358 p.start(imports='cherrypy.test._test_states_demo')
360 pid = p.get_pid()
361 try:
362 # Send a SIGHUP
363 os.kill(pid, SIGHUP)
364 # Give the server some time to restart
365 time.sleep(2)
366 self.getPage("/pid")
367 self.assertStatus(200)
368 new_pid = int(self.body)
369 self.assertNotEqual(new_pid, pid)
370 finally:
371 # Shut down the spawned process
372 self.getPage("/exit")
373 p.join()
375 def test_SIGTERM(self):
376 # SIGTERM should shut down the server whether daemonized or not.
377 try:
378 from signal import SIGTERM
379 except ImportError:
380 return self.skip("skipped (no SIGTERM) ")
382 try:
383 from os import kill
384 except ImportError:
385 return self.skip("skipped (no os.kill) ")
387 # Spawn a normal, undaemonized process.
388 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
389 p.write_conf(
390 extra='test_case_name: "test_SIGTERM"')
391 p.start(imports='cherrypy.test._test_states_demo')
392 # Send a SIGTERM
393 os.kill(p.get_pid(), SIGTERM)
394 # This might hang if things aren't working right, but meh.
395 p.join()
397 if os.name in ['posix']:
398 # Spawn a daemonized process and test again.
399 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'),
400 wait=True, daemonize=True)
401 p.write_conf(
402 extra='test_case_name: "test_SIGTERM_2"')
403 p.start(imports='cherrypy.test._test_states_demo')
404 # Send a SIGTERM
405 os.kill(p.get_pid(), SIGTERM)
406 # This might hang if things aren't working right, but meh.
407 p.join()
409 def test_signal_handler_unsubscribe(self):
410 try:
411 from signal import SIGTERM
412 except ImportError:
413 return self.skip("skipped (no SIGTERM) ")
415 try:
416 from os import kill
417 except ImportError:
418 return self.skip("skipped (no os.kill) ")
420 # Spawn a normal, undaemonized process.
421 p = helper.CPProcess(ssl=(self.scheme.lower()=='https'))
422 p.write_conf(
423 extra="""unsubsig: True
424 test_case_name: "test_signal_handler_unsubscribe"
425 """)
426 p.start(imports='cherrypy.test._test_states_demo')
427 # Send a SIGTERM
428 os.kill(p.get_pid(), SIGTERM)
429 # This might hang if things aren't working right, but meh.
430 p.join()
432 # Assert the old handler ran.
433 target_line = open(p.error_log, 'rb').readlines()[-10]
434 if not ntob("I am an old SIGTERM handler.") in target_line:
435 self.fail("Old SIGTERM handler did not run.\n%r" % target_line)