1 from __future__
import nested_scopes
# Backward compat for 2.1
2 from unittest
import TestCase
3 from wsgiref
.util
import setup_testing_defaults
4 from wsgiref
.headers
import Headers
5 from wsgiref
.handlers
import BaseHandler
, BaseCGIHandler
6 from wsgiref
import util
7 from wsgiref
.validate
import validator
8 from wsgiref
.simple_server
import WSGIServer
, WSGIRequestHandler
, demo_app
9 from wsgiref
.simple_server
import make_server
10 from StringIO
import StringIO
11 from SocketServer
import BaseServer
16 from test
import test_support
18 class MockServer(WSGIServer
):
19 """Non-socket HTTP server"""
21 def __init__(self
, server_address
, RequestHandlerClass
):
22 BaseServer
.__init
__(self
, server_address
, RequestHandlerClass
)
25 def server_bind(self
):
26 host
, port
= self
.server_address
27 self
.server_name
= host
28 self
.server_port
= port
32 class MockHandler(WSGIRequestHandler
):
33 """Non-socket HTTP handler"""
35 self
.connection
= self
.request
36 self
.rfile
, self
.wfile
= self
.connection
45 def hello_app(environ
,start_response
):
46 start_response("200 OK", [
47 ('Content-Type','text/plain'),
48 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
50 return ["Hello, world!"]
52 def run_amock(app
=hello_app
, data
="GET / HTTP/1.0\n\n"):
53 server
= make_server("", 80, app
, MockServer
, MockHandler
)
54 inp
, out
, err
, olderr
= StringIO(data
), StringIO(), StringIO(), sys
.stderr
58 server
.finish_request((inp
,out
), ("127.0.0.1",8888))
62 return out
.getvalue(), err
.getvalue()
86 def compare_generic_iter(make_it
,match
):
87 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
96 if not it
[n
]==item
: raise AssertionError
103 raise AssertionError("Too many items from __getitem__",it
)
110 # Only test iter mode under 2.2+
112 if not iter(it
) is it
: raise AssertionError
114 if not it
.next()==item
: raise AssertionError
117 except StopIteration:
120 raise AssertionError("Too many items from .next()",it
)
127 class IntegrationTests(TestCase
):
129 def check_hello(self
, out
, has_length
=True):
130 self
.assertEqual(out
,
131 "HTTP/1.0 200 OK\r\n"
132 "Server: WSGIServer/0.1 Python/"+sys
.version
.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length
and "Content-Length: 13\r\n" or "") +
140 def test_plain_hello(self
):
141 out
, err
= run_amock()
142 self
.check_hello(out
)
144 def test_validated_hello(self
):
145 out
, err
= run_amock(validator(hello_app
))
146 # the middleware doesn't support len(), so content-length isn't there
147 self
.check_hello(out
, has_length
=False)
149 def test_simple_validation_error(self
):
150 def bad_app(environ
,start_response
):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out
, err
= run_amock(validator(bad_app
))
154 self
.assertTrue(out
.endswith(
155 "A server error occurred. Please contact the administrator."
158 err
.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
160 " be of type list: <type 'tuple'>"
168 class UtilityTests(TestCase
):
170 def checkShift(self
,sn_in
,pi_in
,part
,sn_out
,pi_out
):
171 env
= {'SCRIPT_NAME':sn_in
,'PATH_INFO':pi_in
}
172 util
.setup_testing_defaults(env
)
173 self
.assertEqual(util
.shift_path_info(env
),part
)
174 self
.assertEqual(env
['PATH_INFO'],pi_out
)
175 self
.assertEqual(env
['SCRIPT_NAME'],sn_out
)
178 def checkDefault(self
, key
, value
, alt
=None):
179 # Check defaulting when empty
181 util
.setup_testing_defaults(env
)
182 if isinstance(value
, StringIO
):
183 self
.assertIsInstance(env
[key
], StringIO
)
185 self
.assertEqual(env
[key
], value
)
187 # Check existing value
189 util
.setup_testing_defaults(env
)
190 self
.assertTrue(env
[key
] is alt
)
192 def checkCrossDefault(self
,key
,value
,**kw
):
193 util
.setup_testing_defaults(kw
)
194 self
.assertEqual(kw
[key
],value
)
196 def checkAppURI(self
,uri
,**kw
):
197 util
.setup_testing_defaults(kw
)
198 self
.assertEqual(util
.application_uri(kw
),uri
)
200 def checkReqURI(self
,uri
,query
=1,**kw
):
201 util
.setup_testing_defaults(kw
)
202 self
.assertEqual(util
.request_uri(kw
,query
),uri
)
209 def checkFW(self
,text
,size
,match
):
211 def make_it(text
=text
,size
=size
):
212 return util
.FileWrapper(StringIO(text
),size
)
214 compare_generic_iter(make_it
,match
)
217 self
.assertFalse(it
.filelike
.closed
)
222 self
.assertFalse(it
.filelike
.closed
)
225 self
.assertTrue(it
.filelike
.closed
)
228 def testSimpleShifts(self
):
229 self
.checkShift('','/', '', '/', '')
230 self
.checkShift('','/x', 'x', '/x', '')
231 self
.checkShift('/','', None, '/', '')
232 self
.checkShift('/a','/x/y', 'x', '/a/x', '/y')
233 self
.checkShift('/a','/x/', 'x', '/a/x', '/')
236 def testNormalizedShifts(self
):
237 self
.checkShift('/a/b', '/../y', '..', '/a', '/y')
238 self
.checkShift('', '/../y', '..', '', '/y')
239 self
.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
240 self
.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
241 self
.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
242 self
.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
243 self
.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
244 self
.checkShift('/a/b', '///', '', '/a/b/', '')
245 self
.checkShift('/a/b', '/.//', '', '/a/b/', '')
246 self
.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
247 self
.checkShift('/a/b', '/.', None, '/a/b', '')
250 def testDefaults(self
):
252 ('SERVER_NAME','127.0.0.1'),
253 ('SERVER_PORT', '80'),
254 ('SERVER_PROTOCOL','HTTP/1.0'),
255 ('HTTP_HOST','127.0.0.1'),
256 ('REQUEST_METHOD','GET'),
259 ('wsgi.version', (1,0)),
260 ('wsgi.run_once', 0),
261 ('wsgi.multithread', 0),
262 ('wsgi.multiprocess', 0),
263 ('wsgi.input', StringIO("")),
264 ('wsgi.errors', StringIO()),
265 ('wsgi.url_scheme','http'),
267 self
.checkDefault(key
,value
)
270 def testCrossDefaults(self
):
271 self
.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME
="foo.bar")
272 self
.checkCrossDefault('wsgi.url_scheme',"https",HTTPS
="on")
273 self
.checkCrossDefault('wsgi.url_scheme',"https",HTTPS
="1")
274 self
.checkCrossDefault('wsgi.url_scheme',"https",HTTPS
="yes")
275 self
.checkCrossDefault('wsgi.url_scheme',"http",HTTPS
="foo")
276 self
.checkCrossDefault('SERVER_PORT',"80",HTTPS
="foo")
277 self
.checkCrossDefault('SERVER_PORT',"443",HTTPS
="on")
280 def testGuessScheme(self
):
281 self
.assertEqual(util
.guess_scheme({}), "http")
282 self
.assertEqual(util
.guess_scheme({'HTTPS':"foo"}), "http")
283 self
.assertEqual(util
.guess_scheme({'HTTPS':"on"}), "https")
284 self
.assertEqual(util
.guess_scheme({'HTTPS':"yes"}), "https")
285 self
.assertEqual(util
.guess_scheme({'HTTPS':"1"}), "https")
291 def testAppURIs(self
):
292 self
.checkAppURI("http://127.0.0.1/")
293 self
.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME
="/spam")
294 self
.checkAppURI("http://spam.example.com:2071/",
295 HTTP_HOST
="spam.example.com:2071", SERVER_PORT
="2071")
296 self
.checkAppURI("http://spam.example.com/",
297 SERVER_NAME
="spam.example.com")
298 self
.checkAppURI("http://127.0.0.1/",
299 HTTP_HOST
="127.0.0.1", SERVER_NAME
="spam.example.com")
300 self
.checkAppURI("https://127.0.0.1/", HTTPS
="on")
301 self
.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT
="8000",
304 def testReqURIs(self
):
305 self
.checkReqURI("http://127.0.0.1/")
306 self
.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME
="/spam")
307 self
.checkReqURI("http://127.0.0.1/spammity/spam",
308 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam")
309 self
.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
310 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam",QUERY_STRING
="say=ni")
311 self
.checkReqURI("http://127.0.0.1/spammity/spam", 0,
312 SCRIPT_NAME
="/spammity", PATH_INFO
="/spam",QUERY_STRING
="say=ni")
314 def testFileWrapper(self
):
315 self
.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
317 def testHopByHop(self
):
319 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
320 "TE Trailers Transfer-Encoding Upgrade"
322 for alt
in hop
, hop
.title(), hop
.upper(), hop
.lower():
323 self
.assertTrue(util
.is_hop_by_hop(alt
))
325 # Not comprehensive, just a few random header names
327 "Accept Cache-Control Date Pragma Trailer Via Warning"
329 for alt
in hop
, hop
.title(), hop
.upper(), hop
.lower():
330 self
.assertFalse(util
.is_hop_by_hop(alt
))
332 class HeaderTests(TestCase
):
334 def testMappingInterface(self
):
336 self
.assertEqual(len(Headers([])),0)
337 self
.assertEqual(len(Headers(test
[:])),1)
338 self
.assertEqual(Headers(test
[:]).keys(), ['x'])
339 self
.assertEqual(Headers(test
[:]).values(), ['y'])
340 self
.assertEqual(Headers(test
[:]).items(), test
)
341 self
.assertFalse(Headers(test
).items() is test
) # must be copy!
344 del h
['foo'] # should not raise an error
347 for m
in h
.has_key
, h
.__contains
__, h
.get
, h
.get_all
, h
.__getitem
__:
348 self
.assertTrue(m('foo'))
349 self
.assertTrue(m('Foo'))
350 self
.assertTrue(m('FOO'))
351 self
.assertFalse(m('bar'))
353 self
.assertEqual(h
['foo'],'bar')
355 self
.assertEqual(h
['FOO'],'baz')
356 self
.assertEqual(h
.get_all('foo'),['baz'])
358 self
.assertEqual(h
.get("foo","whee"), "baz")
359 self
.assertEqual(h
.get("zoo","whee"), "whee")
360 self
.assertEqual(h
.setdefault("foo","whee"), "baz")
361 self
.assertEqual(h
.setdefault("zoo","whee"), "whee")
362 self
.assertEqual(h
["foo"],"baz")
363 self
.assertEqual(h
["zoo"],"whee")
365 def testRequireList(self
):
366 self
.assertRaises(TypeError, Headers
, "foo")
369 def testExtras(self
):
371 self
.assertEqual(str(h
),'\r\n')
373 h
.add_header('foo','bar',baz
="spam")
374 self
.assertEqual(h
['foo'], 'bar; baz="spam"')
375 self
.assertEqual(str(h
),'foo: bar; baz="spam"\r\n\r\n')
377 h
.add_header('Foo','bar',cheese
=None)
378 self
.assertEqual(h
.get_all('foo'),
379 ['bar; baz="spam"', 'bar; cheese'])
381 self
.assertEqual(str(h
),
382 'foo: bar; baz="spam"\r\n'
383 'Foo: bar; cheese\r\n'
388 class ErrorHandler(BaseCGIHandler
):
389 """Simple handler subclass for testing BaseHandler"""
391 # BaseHandler records the OS environment at import time, but envvars
392 # might have been changed later by other tests, which trips up
393 # HandlerTests.testEnviron().
394 os_environ
= dict(os
.environ
.items())
396 def __init__(self
,**kw
):
397 setup_testing_defaults(kw
)
398 BaseCGIHandler
.__init
__(
399 self
, StringIO(''), StringIO(), StringIO(), kw
,
400 multithread
=True, multiprocess
=True
403 class TestHandler(ErrorHandler
):
404 """Simple handler subclass for testing BaseHandler, w/error passthru"""
406 def handle_error(self
):
407 raise # for testing, we want to see what's happening
419 class HandlerTests(TestCase
):
421 def checkEnvironAttrs(self
, handler
):
422 env
= handler
.environ
424 'version','multithread','multiprocess','run_once','file_wrapper'
426 if attr
=='file_wrapper' and handler
.wsgi_file_wrapper
is None:
428 self
.assertEqual(getattr(handler
,'wsgi_'+attr
),env
['wsgi.'+attr
])
430 def checkOSEnviron(self
,handler
):
431 empty
= {}; setup_testing_defaults(empty
)
432 env
= handler
.environ
433 from os
import environ
434 for k
,v
in environ
.items():
436 self
.assertEqual(env
[k
],v
)
437 for k
,v
in empty
.items():
438 self
.assertIn(k
, env
)
440 def testEnviron(self
):
441 h
= TestHandler(X
="Y")
443 self
.checkEnvironAttrs(h
)
444 self
.checkOSEnviron(h
)
445 self
.assertEqual(h
.environ
["X"],"Y")
447 def testCGIEnviron(self
):
448 h
= BaseCGIHandler(None,None,None,{})
450 for key
in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
451 self
.assertIn(key
, h
.environ
)
453 def testScheme(self
):
454 h
=TestHandler(HTTPS
="on"); h
.setup_environ()
455 self
.assertEqual(h
.environ
['wsgi.url_scheme'],'https')
456 h
=TestHandler(); h
.setup_environ()
457 self
.assertEqual(h
.environ
['wsgi.url_scheme'],'http')
460 def testAbstractMethods(self
):
463 '_flush','get_stdin','get_stderr','add_cgi_vars'
465 self
.assertRaises(NotImplementedError, getattr(h
,name
))
466 self
.assertRaises(NotImplementedError, h
._write
, "test")
469 def testContentLength(self
):
470 # Demo one reason iteration is better than write()... ;)
472 def trivial_app1(e
,s
):
474 return [e
['wsgi.url_scheme']]
476 def trivial_app2(e
,s
):
477 s('200 OK',[])(e
['wsgi.url_scheme'])
482 self
.assertEqual(h
.stdout
.getvalue(),
484 "Content-Length: 4\r\n"
490 self
.assertEqual(h
.stdout
.getvalue(),
501 def testBasicErrorOutput(self
):
503 def non_error_app(e
,s
):
508 raise AssertionError("This should be caught by handler")
512 self
.assertEqual(h
.stdout
.getvalue(),
514 "Content-Length: 0\r\n"
516 self
.assertEqual(h
.stderr
.getvalue(),"")
520 self
.assertEqual(h
.stdout
.getvalue(),
522 "Content-Type: text/plain\r\n"
523 "Content-Length: %d\r\n"
524 "\r\n%s" % (h
.error_status
,len(h
.error_body
),h
.error_body
))
526 self
.assertNotEqual(h
.stderr
.getvalue().find("AssertionError"), -1)
528 def testErrorAfterOutput(self
):
529 MSG
= "Some output has been sent"
532 raise AssertionError("This should be caught by handler")
536 self
.assertEqual(h
.stdout
.getvalue(),
539 self
.assertNotEqual(h
.stderr
.getvalue().find("AssertionError"), -1)
542 def testHeaderFormats(self
):
544 def non_error_app(e
,s
):
549 r
"HTTP/%s 200 OK\r\n"
550 r
"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
551 r
"%s" r
"Content-Length: 0\r\n" r
"\r\n"
554 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
557 for ssw
in "FooBar/1.0", None:
558 sw
= ssw
and "Server: %s\r\n" % ssw
or ""
560 for version
in "1.0", "1.1":
561 for proto
in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
563 h
= TestHandler(SERVER_PROTOCOL
=proto
)
564 h
.origin_server
= False
565 h
.http_version
= version
566 h
.server_software
= ssw
568 self
.assertEqual(shortpat
,h
.stdout
.getvalue())
570 h
= TestHandler(SERVER_PROTOCOL
=proto
)
571 h
.origin_server
= True
572 h
.http_version
= version
573 h
.server_software
= ssw
575 if proto
=="HTTP/0.9":
576 self
.assertEqual(h
.stdout
.getvalue(),"")
579 re
.match(stdpat
%(version
,sw
), h
.stdout
.getvalue()),
580 (stdpat
%(version
,sw
), h
.stdout
.getvalue())
583 # This epilogue is needed for compatibility with the Python 2.5 regrtest module
586 test_support
.run_unittest(__name__
)
588 if __name__
== "__main__":
619 # the above lines intentionally left blank