1 """Test the various means of instantiating and invoking tools."""
5 from cherrypy
._cpcompat
import BytesIO
, copyitems
, itervalues
, IncompleteRead
, ntob
, ntou
, xrange
11 from cherrypy
import tools
14 europoundUnicode
= ntou('\x80\xa3')
19 from cherrypy
.test
import helper
22 class ToolTests(helper
.CPWebCase
):
25 # Put check_access in a custom toolbox with its own namespace
26 myauthtools
= cherrypy
._cptools
.Toolbox("myauth")
28 def check_access(default
=False):
29 if not getattr(cherrypy
.request
, "userid", default
):
30 raise cherrypy
.HTTPError(401)
31 myauthtools
.check_access
= cherrypy
.Tool('before_request_body', check_access
)
36 for k
, v
in cherrypy
.request
.numerify_map
:
37 chunk
= chunk
.replace(k
, v
)
39 cherrypy
.response
.body
= number_it(cherrypy
.response
.body
)
41 class NumTool(cherrypy
.Tool
):
44 m
= self
._merged
_args
().get("map", {})
45 cherrypy
.request
.numerify_map
= copyitems(m
)
46 cherrypy
.request
.hooks
.attach('on_start_resource', makemap
)
49 cherrypy
.request
.error_response
= cherrypy
.HTTPError(502).set_response
50 critical
.failsafe
= True
52 cherrypy
.request
.hooks
.attach('on_start_resource', critical
)
53 cherrypy
.request
.hooks
.attach(self
._point
, self
.callable)
55 tools
.numerify
= NumTool('before_finalize', numerify
)
57 # It's not mandatory to inherit from cherrypy.Tool.
65 def nadsat_it_up(body
):
67 chunk
= chunk
.replace(ntob("good"), ntob("horrorshow"))
68 chunk
= chunk
.replace(ntob("piece"), ntob("lomtick"))
70 cherrypy
.response
.body
= nadsat_it_up(cherrypy
.response
.body
)
74 # This runs after the request has been completely written out.
75 cherrypy
.response
.body
= [ntob("razdrez")]
76 id = cherrypy
.request
.params
.get("id")
79 cleanup
.failsafe
= True
82 cherrypy
.request
.hooks
.attach('before_finalize', self
.nadsat
)
83 cherrypy
.request
.hooks
.attach('on_end_request', self
.cleanup
)
84 tools
.nadsat
= NadsatTool()
87 cherrypy
.request
.process_request_body
= False
88 clen
= int(cherrypy
.request
.headers
['Content-Length'])
89 cherrypy
.request
.body
= cherrypy
.request
.rfile
.read(clen
)
91 # Assert that we can use a callable object instead of a function.
92 class Rotator(object):
93 def __call__(self
, scale
):
96 r
.body
= [chr((ord(x
) + scale
) % 256) for x
in r
.body
[0]]
97 cherrypy
.tools
.rotator
= cherrypy
.Tool('before_finalize', Rotator())
99 def stream_handler(next_handler
, *args
, **kwargs
):
100 cherrypy
.response
.output
= o
= BytesIO()
102 response
= next_handler(*args
, **kwargs
)
103 # Ignore the response and return our accumulated output instead.
107 cherrypy
.tools
.streamer
= cherrypy
._cptools
.HandlerWrapperTool(stream_handler
)
111 return "Howdy earth!"
115 cherrypy
.response
.output
.write(ntob('I am '))
116 cherrypy
.response
.output
.write(ntob('a tarfile'))
117 tarfile
.exposed
= True
118 tarfile
._cp
_config
= {'tools.streamer.on': True}
121 hooks
= list(cherrypy
.request
.hooks
['before_finalize'])
123 cbnames
= [x
.callback
.__name
__ for x
in hooks
]
124 assert cbnames
== ['gzip'], cbnames
125 priorities
= [x
.priority
for x
in hooks
]
126 assert priorities
== [80], priorities
129 yield europoundUnicode
134 return cherrypy
.request
.body
136 pipe
._cp
_config
= {'hooks.before_request_body': pipe_body
}
138 # Multiple decorators; include kwargs just for fun.
139 # Note that rotator must run before gzip.
140 def decorated_euro(self
, *vpath
):
143 yield europoundUnicode
144 decorated_euro
.exposed
= True
145 decorated_euro
= tools
.gzip(compress_level
=6)(decorated_euro
)
146 decorated_euro
= tools
.rotator(scale
=3)(decorated_euro
)
151 class TestType(type):
152 """Metaclass which automatically exposes all functions in each subclass,
153 and adds an instance of the subclass as an attribute of root.
155 def __init__(cls
, name
, bases
, dct
):
156 type.__init
__(cls
, name
, bases
, dct
)
157 for value
in itervalues(dct
):
158 if isinstance(value
, types
.FunctionType
):
160 setattr(root
, name
.lower(), cls())
162 __metaclass__
= TestType
166 # Declare Tools in _cp_config
169 _cp_config
= {"tools.nadsat.on": True}
171 def index(self
, id=None):
172 return "A good piece of cherry pie"
175 return repr(tools
.nadsat
.ended
[id])
177 def err(self
, id=None):
180 def errinstream(self
, id=None):
181 yield "nonconfidential"
185 # METHOD TWO: decorator using Tool()
186 # We support Python 2.3, but the @-deco syntax would look like this:
187 # @tools.check_access()
188 def restricted(self
):
190 restricted
= myauthtools
.check_access()(restricted
)
193 def err_in_onstart(self
):
196 def stream(self
, id=None):
197 for x
in xrange(100000000):
199 stream
._cp
_config
= {'response.stream': True}
204 # Declare Tools in detached config
206 'tools.numerify.on': True,
207 'tools.numerify.map': {ntob("pie"): ntob("3.14159")},
209 '/demo/restricted': {
210 'request.show_tracebacks': False,
213 'request.show_tracebacks': False,
214 'myauth.check_access.default': True,
216 '/demo/errinstream': {
217 'response.stream': True,
219 '/demo/err_in_onstart': {
220 # Because this isn't a dict, on_start_resource will error.
221 'tools.numerify.map': "pie->3.14159"
225 'tools.gzip.on': True,
226 'tools.encode.on': True,
228 # Priority specified in config
229 '/decorated_euro/subpath': {
230 'tools.gzip.priority': 10,
233 '/tarfile': {'tools.streamer.on': True}
235 app
= cherrypy
.tree
.mount(root
, config
=conf
)
236 app
.request_class
.namespaces
['myauth'] = myauthtools
238 if sys
.version_info
>= (2, 5):
239 from cherrypy
.test
import _test_decorators
240 root
.tooldecs
= _test_decorators
.ToolExamples()
241 setup_server
= staticmethod(setup_server
)
243 def testHookErrors(self
):
244 self
.getPage("/demo/?id=1")
245 # If body is "razdrez", then on_end_request is being called too early.
246 self
.assertBody("A horrorshow lomtick of cherry 3.14159")
247 # If this fails, then on_end_request isn't being called at all.
249 self
.getPage("/demo/ended/1")
250 self
.assertBody("True")
252 valerr
= '\n raise ValueError()\nValueError'
253 self
.getPage("/demo/err?id=3")
254 # If body is "razdrez", then on_end_request is being called too early.
255 self
.assertErrorPage(502, pattern
=valerr
)
256 # If this fails, then on_end_request isn't being called at all.
258 self
.getPage("/demo/ended/3")
259 self
.assertBody("True")
261 # If body is "razdrez", then on_end_request is being called too early.
262 if (cherrypy
.server
.protocol_version
== "HTTP/1.0" or
263 getattr(cherrypy
.server
, "using_apache", False)):
264 self
.getPage("/demo/errinstream?id=5")
265 # Because this error is raised after the response body has
266 # started, the status should not change to an error status.
267 self
.assertStatus("200 OK")
268 self
.assertBody("nonconfidential")
270 # Because this error is raised after the response body has
271 # started, and because it's chunked output, an error is raised by
272 # the HTTP client when it encounters incomplete output.
273 self
.assertRaises((ValueError, IncompleteRead
), self
.getPage
,
274 "/demo/errinstream?id=5")
275 # If this fails, then on_end_request isn't being called at all.
277 self
.getPage("/demo/ended/5")
278 self
.assertBody("True")
280 # Test the "__call__" technique (compile-time decorator).
281 self
.getPage("/demo/restricted")
282 self
.assertErrorPage(401)
284 # Test compile-time decorator with kwargs from config.
285 self
.getPage("/demo/userid")
286 self
.assertBody("Welcome!")
288 def testEndRequestOnDrop(self
):
291 httpserver
= cherrypy
.server
.httpserver
292 old_timeout
= httpserver
.timeout
293 except (AttributeError, IndexError):
297 httpserver
.timeout
= timeout
299 # Test that on_end_request is called even if the client drops.
300 self
.persistent
= True
302 conn
= self
.HTTP_CONN
303 conn
.putrequest("GET", "/demo/stream?id=9", skip_host
=True)
304 conn
.putheader("Host", self
.HOST
)
306 # Skip the rest of the request and close the conn. This will
307 # cause the server's active socket to error, which *should*
308 # result in the request being aborted, and request.close being
309 # called all the way up the stack (including WSGI middleware),
310 # eventually calling our on_end_request hook.
312 self
.persistent
= False
313 time
.sleep(timeout
* 2)
314 # Test that the on_end_request hook was called.
315 self
.getPage("/demo/ended/9")
316 self
.assertBody("True")
318 if old_timeout
is not None:
319 httpserver
.timeout
= old_timeout
321 def testGuaranteedHooks(self
):
322 # The 'critical' on_start_resource hook is 'failsafe' (guaranteed
323 # to run even if there are failures in other on_start methods).
324 # This is NOT true of the other hooks.
325 # Here, we have set up a failure in NumerifyTool.numerify_map,
326 # but our 'critical' hook should run and set the error to 502.
327 self
.getPage("/demo/err_in_onstart")
328 self
.assertErrorPage(502)
329 self
.assertInBody("AttributeError: 'str' object has no attribute 'items'")
331 def testCombinedTools(self
):
332 expectedResult
= (ntou("Hello,world") + europoundUnicode
).encode('utf-8')
334 zfile
= gzip
.GzipFile(mode
='wb', fileobj
=zbuf
, compresslevel
=9)
335 zfile
.write(expectedResult
)
338 self
.getPage("/euro", headers
=[("Accept-Encoding", "gzip"),
339 ("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7")])
340 self
.assertInBody(zbuf
.getvalue()[:3])
343 zfile
= gzip
.GzipFile(mode
='wb', fileobj
=zbuf
, compresslevel
=6)
344 zfile
.write(expectedResult
)
347 self
.getPage("/decorated_euro", headers
=[("Accept-Encoding", "gzip")])
348 self
.assertInBody(zbuf
.getvalue()[:3])
350 # This returns a different value because gzip's priority was
351 # lowered in conf, allowing the rotator to run after gzip.
352 # Of course, we don't want breakage in production apps,
353 # but it proves the priority was changed.
354 self
.getPage("/decorated_euro/subpath",
355 headers
=[("Accept-Encoding", "gzip")])
356 self
.assertInBody(''.join([chr((ord(x
) + 3) % 256) for x
in zbuf
.getvalue()]))
358 def testBareHooks(self
):
359 content
= "bit of a pain in me gulliver"
360 self
.getPage("/pipe",
361 headers
=[("Content-Length", str(len(content
))),
362 ("Content-Type", "text/plain")],
363 method
="POST", body
=content
)
364 self
.assertBody(content
)
366 def testHandlerWrapperTool(self
):
367 self
.getPage("/tarfile")
368 self
.assertBody("I am a tarfile")
370 def testToolWithConfig(self
):
371 if not sys
.version_info
>= (2, 5):
372 return self
.skip("skipped (Python 2.5+ only)")
374 self
.getPage('/tooldecs/blah')
375 self
.assertHeader('Content-Type', 'application/data')
377 def testWarnToolOn(self
):
380 numon
= cherrypy
.tools
.numerify
.on
381 except AttributeError:
384 raise AssertionError("Tool.on did not error as it should have.")
388 cherrypy
.tools
.numerify
.on
= True
389 except AttributeError:
392 raise AssertionError("Tool.on did not error as it should have.")