Update decimal test data to the most recent set from Mike Cowlishaw.
[python.git] / Lib / test / test_httpservers.py
blob228d82b64c802a0c5b70f1cca0898764cf7a9623
1 """Unittests for the various HTTPServer modules.
3 Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4 Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5 """
7 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
8 from SimpleHTTPServer import SimpleHTTPRequestHandler
9 from CGIHTTPServer import CGIHTTPRequestHandler
10 import CGIHTTPServer
12 import os
13 import sys
14 import base64
15 import shutil
16 import urllib
17 import httplib
18 import tempfile
19 import threading
21 import unittest
22 from test import test_support
25 class NoLogRequestHandler:
26 def log_message(self, *args):
27 # don't write log messages to stderr
28 pass
31 class TestServerThread(threading.Thread):
32 def __init__(self, test_object, request_handler):
33 threading.Thread.__init__(self)
34 self.request_handler = request_handler
35 self.test_object = test_object
36 self.test_object.lock.acquire()
38 def run(self):
39 self.server = HTTPServer(('', 0), self.request_handler)
40 self.test_object.PORT = self.server.socket.getsockname()[1]
41 self.test_object.lock.release()
42 try:
43 self.server.serve_forever()
44 finally:
45 self.server.server_close()
47 def stop(self):
48 self.server.shutdown()
51 class BaseTestCase(unittest.TestCase):
52 def setUp(self):
53 self.lock = threading.Lock()
54 self.thread = TestServerThread(self, self.request_handler)
55 self.thread.start()
56 self.lock.acquire()
58 def tearDown(self):
59 self.lock.release()
60 self.thread.stop()
62 def request(self, uri, method='GET', body=None, headers={}):
63 self.connection = httplib.HTTPConnection('localhost', self.PORT)
64 self.connection.request(method, uri, body, headers)
65 return self.connection.getresponse()
68 class BaseHTTPServerTestCase(BaseTestCase):
69 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
70 protocol_version = 'HTTP/1.1'
71 default_request_version = 'HTTP/1.1'
73 def do_TEST(self):
74 self.send_response(204)
75 self.send_header('Content-Type', 'text/html')
76 self.send_header('Connection', 'close')
77 self.end_headers()
79 def do_KEEP(self):
80 self.send_response(204)
81 self.send_header('Content-Type', 'text/html')
82 self.send_header('Connection', 'keep-alive')
83 self.end_headers()
85 def do_KEYERROR(self):
86 self.send_error(999)
88 def do_CUSTOM(self):
89 self.send_response(999)
90 self.send_header('Content-Type', 'text/html')
91 self.send_header('Connection', 'close')
92 self.end_headers()
94 def setUp(self):
95 BaseTestCase.setUp(self)
96 self.con = httplib.HTTPConnection('localhost', self.PORT)
97 self.con.connect()
99 def test_command(self):
100 self.con.request('GET', '/')
101 res = self.con.getresponse()
102 self.assertEquals(res.status, 501)
104 def test_request_line_trimming(self):
105 self.con._http_vsn_str = 'HTTP/1.1\n'
106 self.con.putrequest('GET', '/')
107 self.con.endheaders()
108 res = self.con.getresponse()
109 self.assertEquals(res.status, 501)
111 def test_version_bogus(self):
112 self.con._http_vsn_str = 'FUBAR'
113 self.con.putrequest('GET', '/')
114 self.con.endheaders()
115 res = self.con.getresponse()
116 self.assertEquals(res.status, 400)
118 def test_version_digits(self):
119 self.con._http_vsn_str = 'HTTP/9.9.9'
120 self.con.putrequest('GET', '/')
121 self.con.endheaders()
122 res = self.con.getresponse()
123 self.assertEquals(res.status, 400)
125 def test_version_none_get(self):
126 self.con._http_vsn_str = ''
127 self.con.putrequest('GET', '/')
128 self.con.endheaders()
129 res = self.con.getresponse()
130 self.assertEquals(res.status, 501)
132 def test_version_none(self):
133 self.con._http_vsn_str = ''
134 self.con.putrequest('PUT', '/')
135 self.con.endheaders()
136 res = self.con.getresponse()
137 self.assertEquals(res.status, 400)
139 def test_version_invalid(self):
140 self.con._http_vsn = 99
141 self.con._http_vsn_str = 'HTTP/9.9'
142 self.con.putrequest('GET', '/')
143 self.con.endheaders()
144 res = self.con.getresponse()
145 self.assertEquals(res.status, 505)
147 def test_send_blank(self):
148 self.con._http_vsn_str = ''
149 self.con.putrequest('', '')
150 self.con.endheaders()
151 res = self.con.getresponse()
152 self.assertEquals(res.status, 400)
154 def test_header_close(self):
155 self.con.putrequest('GET', '/')
156 self.con.putheader('Connection', 'close')
157 self.con.endheaders()
158 res = self.con.getresponse()
159 self.assertEquals(res.status, 501)
161 def test_head_keep_alive(self):
162 self.con._http_vsn_str = 'HTTP/1.1'
163 self.con.putrequest('GET', '/')
164 self.con.putheader('Connection', 'keep-alive')
165 self.con.endheaders()
166 res = self.con.getresponse()
167 self.assertEquals(res.status, 501)
169 def test_handler(self):
170 self.con.request('TEST', '/')
171 res = self.con.getresponse()
172 self.assertEquals(res.status, 204)
174 def test_return_header_keep_alive(self):
175 self.con.request('KEEP', '/')
176 res = self.con.getresponse()
177 self.assertEquals(res.getheader('Connection'), 'keep-alive')
178 self.con.request('TEST', '/')
180 def test_internal_key_error(self):
181 self.con.request('KEYERROR', '/')
182 res = self.con.getresponse()
183 self.assertEquals(res.status, 999)
185 def test_return_custom_status(self):
186 self.con.request('CUSTOM', '/')
187 res = self.con.getresponse()
188 self.assertEquals(res.status, 999)
191 class SimpleHTTPServerTestCase(BaseTestCase):
192 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
193 pass
195 def setUp(self):
196 BaseTestCase.setUp(self)
197 self.cwd = os.getcwd()
198 basetempdir = tempfile.gettempdir()
199 os.chdir(basetempdir)
200 self.data = 'We are the knights who say Ni!'
201 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
202 self.tempdir_name = os.path.basename(self.tempdir)
203 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
204 temp.write(self.data)
205 temp.close()
207 def tearDown(self):
208 try:
209 os.chdir(self.cwd)
210 try:
211 shutil.rmtree(self.tempdir)
212 except:
213 pass
214 finally:
215 BaseTestCase.tearDown(self)
217 def check_status_and_reason(self, response, status, data=None):
218 body = response.read()
219 self.assertTrue(response)
220 self.assertEquals(response.status, status)
221 self.assertTrue(response.reason != None)
222 if data:
223 self.assertEqual(data, body)
225 def test_get(self):
226 #constructs the path relative to the root directory of the HTTPServer
227 response = self.request(self.tempdir_name + '/test')
228 self.check_status_and_reason(response, 200, data=self.data)
229 response = self.request(self.tempdir_name + '/')
230 self.check_status_and_reason(response, 200)
231 response = self.request(self.tempdir_name)
232 self.check_status_and_reason(response, 301)
233 response = self.request('/ThisDoesNotExist')
234 self.check_status_and_reason(response, 404)
235 response = self.request('/' + 'ThisDoesNotExist' + '/')
236 self.check_status_and_reason(response, 404)
237 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
238 response = self.request('/' + self.tempdir_name + '/')
239 self.check_status_and_reason(response, 200)
240 if os.name == 'posix':
241 # chmod won't work as expected on Windows platforms
242 os.chmod(self.tempdir, 0)
243 response = self.request(self.tempdir_name + '/')
244 self.check_status_and_reason(response, 404)
245 os.chmod(self.tempdir, 0755)
247 def test_head(self):
248 response = self.request(
249 self.tempdir_name + '/test', method='HEAD')
250 self.check_status_and_reason(response, 200)
251 self.assertEqual(response.getheader('content-length'),
252 str(len(self.data)))
253 self.assertEqual(response.getheader('content-type'),
254 'application/octet-stream')
256 def test_invalid_requests(self):
257 response = self.request('/', method='FOO')
258 self.check_status_and_reason(response, 501)
259 # requests must be case sensitive,so this should fail too
260 response = self.request('/', method='get')
261 self.check_status_and_reason(response, 501)
262 response = self.request('/', method='GETs')
263 self.check_status_and_reason(response, 501)
266 cgi_file1 = """\
267 #!%s
269 print "Content-type: text/html"
270 print
271 print "Hello World"
274 cgi_file2 = """\
275 #!%s
276 import cgi
278 print "Content-type: text/html"
279 print
281 form = cgi.FieldStorage()
282 print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
283 form.getfirst("bacon"))
286 class CGIHTTPServerTestCase(BaseTestCase):
287 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
288 pass
290 def setUp(self):
291 BaseTestCase.setUp(self)
292 self.parent_dir = tempfile.mkdtemp()
293 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
294 os.mkdir(self.cgi_dir)
296 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
297 with open(self.file1_path, 'w') as file1:
298 file1.write(cgi_file1 % sys.executable)
299 os.chmod(self.file1_path, 0777)
301 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
302 with open(self.file2_path, 'w') as file2:
303 file2.write(cgi_file2 % sys.executable)
304 os.chmod(self.file2_path, 0777)
306 self.cwd = os.getcwd()
307 os.chdir(self.parent_dir)
309 def tearDown(self):
310 try:
311 os.chdir(self.cwd)
312 os.remove(self.file1_path)
313 os.remove(self.file2_path)
314 os.rmdir(self.cgi_dir)
315 os.rmdir(self.parent_dir)
316 finally:
317 BaseTestCase.tearDown(self)
319 def test_url_collapse_path_split(self):
320 test_vectors = {
321 '': ('/', ''),
322 '..': IndexError,
323 '/.//..': IndexError,
324 '/': ('/', ''),
325 '//': ('/', ''),
326 '/\\': ('/', '\\'),
327 '/.//': ('/', ''),
328 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
329 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
330 'a': ('/', 'a'),
331 '/a': ('/', 'a'),
332 '//a': ('/', 'a'),
333 './a': ('/', 'a'),
334 './C:/': ('/C:', ''),
335 '/a/b': ('/a', 'b'),
336 '/a/b/': ('/a/b', ''),
337 '/a/b/c/..': ('/a/b', ''),
338 '/a/b/c/../d': ('/a/b', 'd'),
339 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'),
340 '/a/b/c/../d/e/../../f': ('/a/b', 'f'),
341 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'),
342 '../a/b/c/../d/e/.././././..//f': IndexError,
343 '/a/b/c/../d/e/../../../f': ('/a', 'f'),
344 '/a/b/c/../d/e/../../../../f': ('/', 'f'),
345 '/a/b/c/../d/e/../../../../../f': IndexError,
346 '/a/b/c/../d/e/../../../../f/..': ('/', ''),
348 for path, expected in test_vectors.iteritems():
349 if isinstance(expected, type) and issubclass(expected, Exception):
350 self.assertRaises(expected,
351 CGIHTTPServer._url_collapse_path_split, path)
352 else:
353 actual = CGIHTTPServer._url_collapse_path_split(path)
354 self.assertEquals(expected, actual,
355 msg='path = %r\nGot: %r\nWanted: %r' % (
356 path, actual, expected))
358 def test_headers_and_content(self):
359 res = self.request('/cgi-bin/file1.py')
360 self.assertEquals(('Hello World\n', 'text/html', 200), \
361 (res.read(), res.getheader('Content-type'), res.status))
363 def test_post(self):
364 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
365 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
366 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
368 self.assertEquals(res.read(), '1, python, 123456\n')
370 def test_invaliduri(self):
371 res = self.request('/cgi-bin/invalid')
372 res.read()
373 self.assertEquals(res.status, 404)
375 def test_authorization(self):
376 headers = {'Authorization' : 'Basic %s' % \
377 base64.b64encode('username:pass')}
378 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
379 self.assertEquals(('Hello World\n', 'text/html', 200), \
380 (res.read(), res.getheader('Content-type'), res.status))
382 def test_no_leading_slash(self):
383 # http://bugs.python.org/issue2254
384 res = self.request('cgi-bin/file1.py')
385 self.assertEquals(('Hello World\n', 'text/html', 200),
386 (res.read(), res.getheader('Content-type'), res.status))
389 def test_main(verbose=None):
390 try:
391 cwd = os.getcwd()
392 test_support.run_unittest(BaseHTTPServerTestCase,
393 SimpleHTTPServerTestCase,
394 CGIHTTPServerTestCase
396 finally:
397 os.chdir(cwd)
399 if __name__ == '__main__':
400 test_main()