1 from cherrypy
._cpcompat
import BadStatusLine
, ntob
8 engine
= cherrypy
.engine
9 thisdir
= os
.path
.join(os
.getcwd(), os
.path
.dirname(__file__
))
14 def __init__(self
, bus
):
22 self
.bus
.subscribe('start', self
.start
)
23 self
.bus
.subscribe('stop', self
.stop
)
24 self
.bus
.subscribe('graceful', self
.graceful
)
25 self
.bus
.subscribe('start_thread', self
.startthread
)
26 self
.bus
.subscribe('stop_thread', self
.stopthread
)
38 def startthread(self
, thread_id
):
39 self
.threads
[thread_id
] = None
41 def stopthread(self
, thread_id
):
42 del self
.threads
[thread_id
]
44 db_connection
= Dependency(engine
)
53 raise KeyboardInterrupt()
58 return "app was (gracefully) restarted succesfully"
59 graceful
.exposed
= True
61 def block_explicit(self
):
63 if cherrypy
.response
.timed_out
:
64 cherrypy
.response
.timed_out
= False
67 block_explicit
.exposed
= True
69 def block_implicit(self
):
71 return "response.timeout = %s" % cherrypy
.response
.timeout
72 block_implicit
.exposed
= True
74 cherrypy
.tree
.mount(Root())
75 cherrypy
.config
.update({
76 'environment': 'test_suite',
77 'engine.deadlock_poll_freq': 0.1,
80 db_connection
.subscribe()
84 # ------------ Enough helpers. Time for real live test cases. ------------ #
87 from cherrypy
.test
import helper
89 class ServerStateTests(helper
.CPWebCase
):
90 setup_server
= staticmethod(setup_server
)
93 cherrypy
.server
.socket_timeout
= 0.1
95 def test_0_NormalStateFlow(self
):
97 # Our db_connection should not be running
98 self
.assertEqual(db_connection
.running
, False)
99 self
.assertEqual(db_connection
.startcount
, 1)
100 self
.assertEqual(len(db_connection
.threads
), 0)
104 self
.assertEqual(engine
.state
, engine
.states
.STARTED
)
106 host
= cherrypy
.server
.socket_host
107 port
= cherrypy
.server
.socket_port
108 self
.assertRaises(IOError, cherrypy
._cpserver
.check_port
, host
, port
)
110 # The db_connection should be running now
111 self
.assertEqual(db_connection
.running
, True)
112 self
.assertEqual(db_connection
.startcount
, 2)
113 self
.assertEqual(len(db_connection
.threads
), 0)
116 self
.assertBody("Hello World")
117 self
.assertEqual(len(db_connection
.threads
), 1)
119 # Test engine stop. This will also stop the HTTP server.
121 self
.assertEqual(engine
.state
, engine
.states
.STOPPED
)
123 # Verify that our custom stop function was called
124 self
.assertEqual(db_connection
.running
, False)
125 self
.assertEqual(len(db_connection
.threads
), 0)
127 # Block the main thread now and verify that exit() works.
130 self
.assertBody("Hello World")
132 cherrypy
.server
.start()
133 engine
.start_with_callback(exittest
)
135 self
.assertEqual(engine
.state
, engine
.states
.EXITING
)
137 def test_1_Restart(self
):
138 cherrypy
.server
.start()
141 # The db_connection should be running now
142 self
.assertEqual(db_connection
.running
, True)
143 grace
= db_connection
.gracecount
146 self
.assertBody("Hello World")
147 self
.assertEqual(len(db_connection
.threads
), 1)
149 # Test server restart from this thread
151 self
.assertEqual(engine
.state
, engine
.states
.STARTED
)
153 self
.assertBody("Hello World")
154 self
.assertEqual(db_connection
.running
, True)
155 self
.assertEqual(db_connection
.gracecount
, grace
+ 1)
156 self
.assertEqual(len(db_connection
.threads
), 1)
158 # Test server restart from inside a page handler
159 self
.getPage("/graceful")
160 self
.assertEqual(engine
.state
, engine
.states
.STARTED
)
161 self
.assertBody("app was (gracefully) restarted succesfully")
162 self
.assertEqual(db_connection
.running
, True)
163 self
.assertEqual(db_connection
.gracecount
, grace
+ 2)
164 # Since we are requesting synchronously, is only one thread used?
165 # Note that the "/graceful" request has been flushed.
166 self
.assertEqual(len(db_connection
.threads
), 0)
169 self
.assertEqual(engine
.state
, engine
.states
.STOPPED
)
170 self
.assertEqual(db_connection
.running
, False)
171 self
.assertEqual(len(db_connection
.threads
), 0)
173 def test_2_KeyboardInterrupt(self
):
174 # Raise a keyboard interrupt in the HTTP server's main thread.
175 # We must start the server in this, the main thread
177 cherrypy
.server
.start()
179 self
.persistent
= True
181 # Make the first request and assert there's no "Connection: close".
183 self
.assertStatus('200 OK')
184 self
.assertBody("Hello World")
185 self
.assertNoHeader("Connection")
187 cherrypy
.server
.httpserver
.interrupt
= KeyboardInterrupt
190 self
.assertEqual(db_connection
.running
, False)
191 self
.assertEqual(len(db_connection
.threads
), 0)
192 self
.assertEqual(engine
.state
, engine
.states
.EXITING
)
194 self
.persistent
= False
196 # Raise a keyboard interrupt in a page handler; on multithreaded
197 # servers, this should occur in one of the worker threads.
198 # This should raise a BadStatusLine error, since the worker
199 # thread will just die without writing a response.
201 cherrypy
.server
.start()
204 self
.getPage("/ctrlc")
205 except BadStatusLine
:
209 self
.fail("AssertionError: BadStatusLine not raised")
212 self
.assertEqual(db_connection
.running
, False)
213 self
.assertEqual(len(db_connection
.threads
), 0)
215 def test_3_Deadlocks(self
):
216 cherrypy
.config
.update({'response.timeout': 0.2})
219 cherrypy
.server
.start()
221 self
.assertNotEqual(engine
.timeout_monitor
.thread
, None)
223 # Request a "normal" page.
224 self
.assertEqual(engine
.timeout_monitor
.servings
, [])
226 self
.assertBody("Hello World")
227 # request.close is called async.
228 while engine
.timeout_monitor
.servings
:
229 sys
.stdout
.write(".")
232 # Request a page that explicitly checks itself for deadlock.
233 # The deadlock_timeout should be 2 secs.
234 self
.getPage("/block_explicit")
235 self
.assertBody("broken!")
237 # Request a page that implicitly breaks deadlock.
238 # If we deadlock, we want to touch as little code as possible,
239 # so we won't even call handle_error, just bail ASAP.
240 self
.getPage("/block_implicit")
241 self
.assertStatus(500)
242 self
.assertInBody("raise cherrypy.TimeoutError()")
246 def test_4_Autoreload(self
):
247 # Start the demo script in a new process
248 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'))
250 extra
='test_case_name: "test_4_Autoreload"')
251 p
.start(imports
='cherrypy.test._test_states_demo')
253 self
.getPage("/start")
254 start
= float(self
.body
)
256 # Give the autoreloader time to cache the file time.
260 os
.utime(os
.path
.join(thisdir
, "_test_states_demo.py"), None)
262 # Give the autoreloader time to re-exec the process
264 host
= cherrypy
.server
.socket_host
265 port
= cherrypy
.server
.socket_port
266 cherrypy
._cpserver
.wait_for_occupied_port(host
, port
)
268 self
.getPage("/start")
269 self
.assert_(float(self
.body
) > start
)
271 # Shut down the spawned process
272 self
.getPage("/exit")
275 def test_5_Start_Error(self
):
276 # If a process errors during start, it should stop the engine
277 # and exit with a non-zero exit code.
278 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'),
281 extra
="""starterror: True
282 test_case_name: "test_5_Start_Error"
285 p
.start(imports
='cherrypy.test._test_states_demo')
287 self
.fail("Process failed to return nonzero exit code.")
290 class PluginTests(helper
.CPWebCase
):
291 def test_daemonize(self
):
292 if os
.name
not in ['posix']:
293 return self
.skip("skipped (not on posix) ")
294 self
.HOST
= '127.0.0.1'
296 # Spawn the process and wait, when this returns, the original process
297 # is finished. If it daemonized properly, we should still be able
299 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'),
300 wait
=True, daemonize
=True,
301 socket_host
='127.0.0.1',
304 extra
='test_case_name: "test_daemonize"')
305 p
.start(imports
='cherrypy.test._test_states_demo')
307 # Just get the pid of the daemonization process.
309 self
.assertStatus(200)
310 page_pid
= int(self
.body
)
311 self
.assertEqual(page_pid
, p
.get_pid())
313 # Shut down the spawned process
314 self
.getPage("/exit")
317 # Wait until here to test the exit code because we want to ensure
318 # that we wait for the daemon to finish running before we fail.
320 self
.fail("Daemonized parent process failed to exit cleanly.")
323 class SignalHandlingTests(helper
.CPWebCase
):
324 def test_SIGHUP_tty(self
):
325 # When not daemonized, SIGHUP should shut down the server.
327 from signal
import SIGHUP
329 return self
.skip("skipped (no SIGHUP) ")
332 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'))
334 extra
='test_case_name: "test_SIGHUP_tty"')
335 p
.start(imports
='cherrypy.test._test_states_demo')
337 os
.kill(p
.get_pid(), SIGHUP
)
338 # This might hang if things aren't working right, but meh.
341 def test_SIGHUP_daemonized(self
):
342 # When daemonized, SIGHUP should restart the server.
344 from signal
import SIGHUP
346 return self
.skip("skipped (no SIGHUP) ")
348 if os
.name
not in ['posix']:
349 return self
.skip("skipped (not on posix) ")
351 # Spawn the process and wait, when this returns, the original process
352 # is finished. If it daemonized properly, we should still be able
354 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'),
355 wait
=True, daemonize
=True)
357 extra
='test_case_name: "test_SIGHUP_daemonized"')
358 p
.start(imports
='cherrypy.test._test_states_demo')
364 # Give the server some time to restart
367 self
.assertStatus(200)
368 new_pid
= int(self
.body
)
369 self
.assertNotEqual(new_pid
, pid
)
371 # Shut down the spawned process
372 self
.getPage("/exit")
375 def test_SIGTERM(self
):
376 # SIGTERM should shut down the server whether daemonized or not.
378 from signal
import SIGTERM
380 return self
.skip("skipped (no SIGTERM) ")
385 return self
.skip("skipped (no os.kill) ")
387 # Spawn a normal, undaemonized process.
388 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'))
390 extra
='test_case_name: "test_SIGTERM"')
391 p
.start(imports
='cherrypy.test._test_states_demo')
393 os
.kill(p
.get_pid(), SIGTERM
)
394 # This might hang if things aren't working right, but meh.
397 if os
.name
in ['posix']:
398 # Spawn a daemonized process and test again.
399 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'),
400 wait
=True, daemonize
=True)
402 extra
='test_case_name: "test_SIGTERM_2"')
403 p
.start(imports
='cherrypy.test._test_states_demo')
405 os
.kill(p
.get_pid(), SIGTERM
)
406 # This might hang if things aren't working right, but meh.
409 def test_signal_handler_unsubscribe(self
):
411 from signal
import SIGTERM
413 return self
.skip("skipped (no SIGTERM) ")
418 return self
.skip("skipped (no os.kill) ")
420 # Spawn a normal, undaemonized process.
421 p
= helper
.CPProcess(ssl
=(self
.scheme
.lower()=='https'))
423 extra
="""unsubsig: True
424 test_case_name: "test_signal_handler_unsubscribe"
426 p
.start(imports
='cherrypy.test._test_states_demo')
428 os
.kill(p
.get_pid(), SIGTERM
)
429 # This might hang if things aren't working right, but meh.
432 # Assert the old handler ran.
433 target_line
= open(p
.error_log
, 'rb').readlines()[-10]
434 if not ntob("I am an old SIGTERM handler.") in target_line
:
435 self
.fail("Old SIGTERM handler did not run.\n%r" % target_line
)