Bundled cherrypy.
[smonitor.git] / monitor / cherrypy / test / helper.py
blobff9e06cfc438c1f8bcc5ee82a16648d0f47172b7
1 """A library of helper functions for the CherryPy test suite."""
3 import datetime
4 import logging
5 log = logging.getLogger(__name__)
6 import os
7 thisdir = os.path.abspath(os.path.dirname(__file__))
8 serverpem = os.path.join(os.getcwd(), thisdir, 'test.pem')
10 import re
11 import sys
12 import time
13 import warnings
15 import cherrypy
16 from cherrypy._cpcompat import basestring, copyitems, HTTPSConnection, ntob
17 from cherrypy.lib import httputil
18 from cherrypy.lib.reprconf import unrepr
19 from cherrypy.test import webtest
21 import nose
23 _testconfig = None
25 def get_tst_config(overconf = {}):
26 global _testconfig
27 if _testconfig is None:
28 conf = {
29 'scheme': 'http',
30 'protocol': "HTTP/1.1",
31 'port': 8080,
32 'host': '127.0.0.1',
33 'validate': False,
34 'conquer': False,
35 'server': 'wsgi',
37 try:
38 import testconfig
39 _conf = testconfig.config.get('supervisor', None)
40 if _conf is not None:
41 for k, v in _conf.items():
42 if isinstance(v, basestring):
43 _conf[k] = unrepr(v)
44 conf.update(_conf)
45 except ImportError:
46 pass
47 _testconfig = conf
48 conf = _testconfig.copy()
49 conf.update(overconf)
51 return conf
53 class Supervisor(object):
54 """Base class for modeling and controlling servers during testing."""
56 def __init__(self, **kwargs):
57 for k, v in kwargs.items():
58 if k == 'port':
59 setattr(self, k, int(v))
60 setattr(self, k, v)
63 log_to_stderr = lambda msg, level: sys.stderr.write(msg + os.linesep)
65 class LocalSupervisor(Supervisor):
66 """Base class for modeling/controlling servers which run in the same process.
68 When the server side runs in a different process, start/stop can dump all
69 state between each test module easily. When the server side runs in the
70 same process as the client, however, we have to do a bit more work to ensure
71 config and mounted apps are reset between tests.
72 """
74 using_apache = False
75 using_wsgi = False
77 def __init__(self, **kwargs):
78 for k, v in kwargs.items():
79 setattr(self, k, v)
81 cherrypy.server.httpserver = self.httpserver_class
83 engine = cherrypy.engine
84 if hasattr(engine, "signal_handler"):
85 engine.signal_handler.subscribe()
86 if hasattr(engine, "console_control_handler"):
87 engine.console_control_handler.subscribe()
88 #engine.subscribe('log', log_to_stderr)
90 def start(self, modulename=None):
91 """Load and start the HTTP server."""
92 if modulename:
93 # Unhook httpserver so cherrypy.server.start() creates a new
94 # one (with config from setup_server, if declared).
95 cherrypy.server.httpserver = None
97 cherrypy.engine.start()
99 self.sync_apps()
101 def sync_apps(self):
102 """Tell the server about any apps which the setup functions mounted."""
103 pass
105 def stop(self):
106 td = getattr(self, 'teardown', None)
107 if td:
108 td()
110 cherrypy.engine.exit()
112 for name, server in copyitems(getattr(cherrypy, 'servers', {})):
113 server.unsubscribe()
114 del cherrypy.servers[name]
117 class NativeServerSupervisor(LocalSupervisor):
118 """Server supervisor for the builtin HTTP server."""
120 httpserver_class = "cherrypy._cpnative_server.CPHTTPServer"
121 using_apache = False
122 using_wsgi = False
124 def __str__(self):
125 return "Builtin HTTP Server on %s:%s" % (self.host, self.port)
128 class LocalWSGISupervisor(LocalSupervisor):
129 """Server supervisor for the builtin WSGI server."""
131 httpserver_class = "cherrypy._cpwsgi_server.CPWSGIServer"
132 using_apache = False
133 using_wsgi = True
135 def __str__(self):
136 return "Builtin WSGI Server on %s:%s" % (self.host, self.port)
138 def sync_apps(self):
139 """Hook a new WSGI app into the origin server."""
140 cherrypy.server.httpserver.wsgi_app = self.get_app()
142 def get_app(self, app=None):
143 """Obtain a new (decorated) WSGI app to hook into the origin server."""
144 if app is None:
145 app = cherrypy.tree
147 if self.conquer:
148 try:
149 import wsgiconq
150 except ImportError:
151 warnings.warn("Error importing wsgiconq. pyconquer will not run.")
152 else:
153 app = wsgiconq.WSGILogger(app, c_calls=True)
155 if self.validate:
156 try:
157 from wsgiref import validate
158 except ImportError:
159 warnings.warn("Error importing wsgiref. The validator will not run.")
160 else:
161 #wraps the app in the validator
162 app = validate.validator(app)
164 return app
167 def get_cpmodpy_supervisor(**options):
168 from cherrypy.test import modpy
169 sup = modpy.ModPythonSupervisor(**options)
170 sup.template = modpy.conf_cpmodpy
171 return sup
173 def get_modpygw_supervisor(**options):
174 from cherrypy.test import modpy
175 sup = modpy.ModPythonSupervisor(**options)
176 sup.template = modpy.conf_modpython_gateway
177 sup.using_wsgi = True
178 return sup
180 def get_modwsgi_supervisor(**options):
181 from cherrypy.test import modwsgi
182 return modwsgi.ModWSGISupervisor(**options)
184 def get_modfcgid_supervisor(**options):
185 from cherrypy.test import modfcgid
186 return modfcgid.ModFCGISupervisor(**options)
188 def get_modfastcgi_supervisor(**options):
189 from cherrypy.test import modfastcgi
190 return modfastcgi.ModFCGISupervisor(**options)
192 def get_wsgi_u_supervisor(**options):
193 cherrypy.server.wsgi_version = ('u', 0)
194 return LocalWSGISupervisor(**options)
197 class CPWebCase(webtest.WebCase):
199 script_name = ""
200 scheme = "http"
202 available_servers = {'wsgi': LocalWSGISupervisor,
203 'wsgi_u': get_wsgi_u_supervisor,
204 'native': NativeServerSupervisor,
205 'cpmodpy': get_cpmodpy_supervisor,
206 'modpygw': get_modpygw_supervisor,
207 'modwsgi': get_modwsgi_supervisor,
208 'modfcgid': get_modfcgid_supervisor,
209 'modfastcgi': get_modfastcgi_supervisor,
211 default_server = "wsgi"
213 def _setup_server(cls, supervisor, conf):
214 v = sys.version.split()[0]
215 log.info("Python version used to run this test script: %s" % v)
216 log.info("CherryPy version: %s" % cherrypy.__version__)
217 if supervisor.scheme == "https":
218 ssl = " (ssl)"
219 else:
220 ssl = ""
221 log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl))
222 log.info("PID: %s" % os.getpid())
224 cherrypy.server.using_apache = supervisor.using_apache
225 cherrypy.server.using_wsgi = supervisor.using_wsgi
227 if sys.platform[:4] == 'java':
228 cherrypy.config.update({'server.nodelay': False})
230 if isinstance(conf, basestring):
231 parser = cherrypy.lib.reprconf.Parser()
232 conf = parser.dict_from_file(conf).get('global', {})
233 else:
234 conf = conf or {}
235 baseconf = conf.copy()
236 baseconf.update({'server.socket_host': supervisor.host,
237 'server.socket_port': supervisor.port,
238 'server.protocol_version': supervisor.protocol,
239 'environment': "test_suite",
241 if supervisor.scheme == "https":
242 #baseconf['server.ssl_module'] = 'builtin'
243 baseconf['server.ssl_certificate'] = serverpem
244 baseconf['server.ssl_private_key'] = serverpem
246 # helper must be imported lazily so the coverage tool
247 # can run against module-level statements within cherrypy.
248 # Also, we have to do "from cherrypy.test import helper",
249 # exactly like each test module does, because a relative import
250 # would stick a second instance of webtest in sys.modules,
251 # and we wouldn't be able to globally override the port anymore.
252 if supervisor.scheme == "https":
253 webtest.WebCase.HTTP_CONN = HTTPSConnection
254 return baseconf
255 _setup_server = classmethod(_setup_server)
257 def setup_class(cls):
259 #Creates a server
260 conf = get_tst_config()
261 supervisor_factory = cls.available_servers.get(conf.get('server', 'wsgi'))
262 if supervisor_factory is None:
263 raise RuntimeError('Unknown server in config: %s' % conf['server'])
264 supervisor = supervisor_factory(**conf)
266 #Copied from "run_test_suite"
267 cherrypy.config.reset()
268 baseconf = cls._setup_server(supervisor, conf)
269 cherrypy.config.update(baseconf)
270 setup_client()
272 if hasattr(cls, 'setup_server'):
273 # Clear the cherrypy tree and clear the wsgi server so that
274 # it can be updated with the new root
275 cherrypy.tree = cherrypy._cptree.Tree()
276 cherrypy.server.httpserver = None
277 cls.setup_server()
278 supervisor.start(cls.__module__)
280 cls.supervisor = supervisor
281 setup_class = classmethod(setup_class)
283 def teardown_class(cls):
285 if hasattr(cls, 'setup_server'):
286 cls.supervisor.stop()
287 teardown_class = classmethod(teardown_class)
289 def prefix(self):
290 return self.script_name.rstrip("/")
292 def base(self):
293 if ((self.scheme == "http" and self.PORT == 80) or
294 (self.scheme == "https" and self.PORT == 443)):
295 port = ""
296 else:
297 port = ":%s" % self.PORT
299 return "%s://%s%s%s" % (self.scheme, self.HOST, port,
300 self.script_name.rstrip("/"))
302 def exit(self):
303 sys.exit()
305 def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
306 """Open the url. Return status, headers, body."""
307 if self.script_name:
308 url = httputil.urljoin(self.script_name, url)
309 return webtest.WebCase.getPage(self, url, headers, method, body, protocol)
311 def skip(self, msg='skipped '):
312 raise nose.SkipTest(msg)
314 def assertErrorPage(self, status, message=None, pattern=''):
315 """Compare the response body with a built in error page.
317 The function will optionally look for the regexp pattern,
318 within the exception embedded in the error page."""
320 # This will never contain a traceback
321 page = cherrypy._cperror.get_error_page(status, message=message)
323 # First, test the response body without checking the traceback.
324 # Stick a match-all group (.*) in to grab the traceback.
325 esc = re.escape
326 epage = esc(page)
327 epage = epage.replace(esc('<pre id="traceback"></pre>'),
328 esc('<pre id="traceback">') + '(.*)' + esc('</pre>'))
329 m = re.match(ntob(epage, self.encoding), self.body, re.DOTALL)
330 if not m:
331 self._handlewebError('Error page does not match; expected:\n' + page)
332 return
334 # Now test the pattern against the traceback
335 if pattern is None:
336 # Special-case None to mean that there should be *no* traceback.
337 if m and m.group(1):
338 self._handlewebError('Error page contains traceback')
339 else:
340 if (m is None) or (
341 not re.search(ntob(re.escape(pattern), self.encoding),
342 m.group(1))):
343 msg = 'Error page does not contain %s in traceback'
344 self._handlewebError(msg % repr(pattern))
346 date_tolerance = 2
348 def assertEqualDates(self, dt1, dt2, seconds=None):
349 """Assert abs(dt1 - dt2) is within Y seconds."""
350 if seconds is None:
351 seconds = self.date_tolerance
353 if dt1 > dt2:
354 diff = dt1 - dt2
355 else:
356 diff = dt2 - dt1
357 if not diff < datetime.timedelta(seconds=seconds):
358 raise AssertionError('%r and %r are not within %r seconds.' %
359 (dt1, dt2, seconds))
362 def setup_client():
363 """Set up the WebCase classes to match the server's socket settings."""
364 webtest.WebCase.PORT = cherrypy.server.socket_port
365 webtest.WebCase.HOST = cherrypy.server.socket_host
366 if cherrypy.server.ssl_certificate:
367 CPWebCase.scheme = 'https'
369 # --------------------------- Spawning helpers --------------------------- #
372 class CPProcess(object):
374 pid_file = os.path.join(thisdir, 'test.pid')
375 config_file = os.path.join(thisdir, 'test.conf')
376 config_template = """[global]
377 server.socket_host: '%(host)s'
378 server.socket_port: %(port)s
379 checker.on: False
380 log.screen: False
381 log.error_file: r'%(error_log)s'
382 log.access_file: r'%(access_log)s'
383 %(ssl)s
384 %(extra)s
386 error_log = os.path.join(thisdir, 'test.error.log')
387 access_log = os.path.join(thisdir, 'test.access.log')
389 def __init__(self, wait=False, daemonize=False, ssl=False, socket_host=None, socket_port=None):
390 self.wait = wait
391 self.daemonize = daemonize
392 self.ssl = ssl
393 self.host = socket_host or cherrypy.server.socket_host
394 self.port = socket_port or cherrypy.server.socket_port
396 def write_conf(self, extra=""):
397 if self.ssl:
398 serverpem = os.path.join(thisdir, 'test.pem')
399 ssl = """
400 server.ssl_certificate: r'%s'
401 server.ssl_private_key: r'%s'
402 """ % (serverpem, serverpem)
403 else:
404 ssl = ""
406 conf = self.config_template % {
407 'host': self.host,
408 'port': self.port,
409 'error_log': self.error_log,
410 'access_log': self.access_log,
411 'ssl': ssl,
412 'extra': extra,
414 f = open(self.config_file, 'wb')
415 f.write(ntob(conf, 'utf-8'))
416 f.close()
418 def start(self, imports=None):
419 """Start cherryd in a subprocess."""
420 cherrypy._cpserver.wait_for_free_port(self.host, self.port)
422 args = [sys.executable, os.path.join(thisdir, '..', 'cherryd'),
423 '-c', self.config_file, '-p', self.pid_file]
425 if not isinstance(imports, (list, tuple)):
426 imports = [imports]
427 for i in imports:
428 if i:
429 args.append('-i')
430 args.append(i)
432 if self.daemonize:
433 args.append('-d')
435 env = os.environ.copy()
436 # Make sure we import the cherrypy package in which this module is defined.
437 grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..'))
438 if env.get('PYTHONPATH', ''):
439 env['PYTHONPATH'] = os.pathsep.join((grandparentdir, env['PYTHONPATH']))
440 else:
441 env['PYTHONPATH'] = grandparentdir
442 if self.wait:
443 self.exit_code = os.spawnve(os.P_WAIT, sys.executable, args, env)
444 else:
445 os.spawnve(os.P_NOWAIT, sys.executable, args, env)
446 cherrypy._cpserver.wait_for_occupied_port(self.host, self.port)
448 # Give the engine a wee bit more time to finish STARTING
449 if self.daemonize:
450 time.sleep(2)
451 else:
452 time.sleep(1)
454 def get_pid(self):
455 return int(open(self.pid_file, 'rb').read())
457 def join(self):
458 """Wait for the process to exit."""
459 try:
460 try:
461 # Mac, UNIX
462 os.wait()
463 except AttributeError:
464 # Windows
465 try:
466 pid = self.get_pid()
467 except IOError:
468 # Assume the subprocess deleted the pidfile on shutdown.
469 pass
470 else:
471 os.waitpid(pid, 0)
472 except OSError:
473 x = sys.exc_info()[1]
474 if x.args != (10, 'No child processes'):
475 raise