Change a variable type to avoid signed overflow; replace repeated '19999' constant...
[python.git] / Lib / test / test_urllib2_localnet.py
blob1375e112b8ed3982d7deb678c1c508eec4850df3
1 #!/usr/bin/env python
3 import mimetools
4 import threading
5 import urlparse
6 import urllib2
7 import BaseHTTPServer
8 import unittest
9 import hashlib
10 from test import test_support
12 # Loopback http server infrastructure
14 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseHTTPServer.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
28 def get_request(self):
29 """BaseHTTPServer method, overridden."""
31 request, client_address = self.socket.accept()
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
38 return (request, client_address)
40 class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
43 def __init__(self, request_handler):
44 threading.Thread.__init__(self)
45 self._stop = False
46 self.ready = threading.Event()
47 request_handler.protocol_version = "HTTP/1.0"
48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
49 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
54 def stop(self):
55 """Stops the webserver if it's currently running."""
57 # Set the stop flag.
58 self._stop = True
60 self.join()
62 def run(self):
63 self.ready.set()
64 while not self._stop:
65 self.httpd.handle_request()
67 # Authentication infrastructure
69 class DigestAuthHandler:
70 """Handler for performing digest authentication."""
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
79 def set_qop(self, qop):
80 self._qop = qop
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
86 def set_realm(self, realm):
87 self._realm_name = realm
89 def _generate_nonce(self):
90 self._request_num += 1
91 nonce = hashlib.md5(str(self._request_num)).hexdigest()
92 self._nonces.append(nonce)
93 return nonce
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
99 parts = auth_str.split(",")
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
119 HA1 = hashlib.md5(HA1_str).hexdigest()
120 HA2_str = "%(method)s:%(uri)s" % final_dict
121 HA2 = hashlib.md5(HA2_str).hexdigest()
122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126 response = hashlib.md5(response_str).hexdigest()
128 return response == auth_dict["response"]
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
142 request_handler.wfile.write("Proxy Authentication Required.")
143 return False
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
154 if len(self._users) == 0:
155 return True
157 if not request_handler.headers.has_key('Proxy-Authorization'):
158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
161 request_handler.headers['Proxy-Authorization']
163 if self._users.has_key(auth_dict["username"]):
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
172 auth_validated = False
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
189 # Proxy test infrastructure
191 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
198 def __init__(self, digest_auth_handler, *args, **kwargs):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self.digest_auth_handler = digest_auth_handler
202 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
204 def log_message(self, format, *args):
205 # Uncomment the next line for debugging.
206 #sys.stderr.write(format % args)
207 pass
209 def do_GET(self):
210 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
211 self.path, 'http')
212 self.short_path = path
213 if self.digest_auth_handler.handle_request(self):
214 self.send_response(200, "OK")
215 self.send_header("Content-Type", "text/html")
216 self.end_headers()
217 self.wfile.write("You've reached %s!<BR>" % self.path)
218 self.wfile.write("Our apologies, but our server is down due to "
219 "a sudden zombie invasion.")
221 # Test cases
223 class BaseTestCase(unittest.TestCase):
224 def setUp(self):
225 self._threads = test_support.threading_setup()
227 def tearDown(self):
228 test_support.threading_cleanup(*self._threads)
231 class ProxyAuthTests(BaseTestCase):
232 URL = "http://localhost"
234 USER = "tester"
235 PASSWD = "test123"
236 REALM = "TestRealm"
238 def setUp(self):
239 self.digest_auth_handler = DigestAuthHandler()
240 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
241 self.digest_auth_handler.set_realm(self.REALM)
242 def create_fake_proxy_handler(*args, **kwargs):
243 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
245 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
246 self.server.start()
247 self.server.ready.wait()
248 proxy_url = "http://127.0.0.1:%d" % self.server.port
249 handler = urllib2.ProxyHandler({"http" : proxy_url})
250 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
251 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
253 def tearDown(self):
254 self.server.stop()
256 def test_proxy_with_bad_password_raises_httperror(self):
257 self.proxy_digest_handler.add_password(self.REALM, self.URL,
258 self.USER, self.PASSWD+"bad")
259 self.digest_auth_handler.set_qop("auth")
260 self.assertRaises(urllib2.HTTPError,
261 self.opener.open,
262 self.URL)
264 def test_proxy_with_no_password_raises_httperror(self):
265 self.digest_auth_handler.set_qop("auth")
266 self.assertRaises(urllib2.HTTPError,
267 self.opener.open,
268 self.URL)
270 def test_proxy_qop_auth_works(self):
271 self.proxy_digest_handler.add_password(self.REALM, self.URL,
272 self.USER, self.PASSWD)
273 self.digest_auth_handler.set_qop("auth")
274 result = self.opener.open(self.URL)
275 while result.read():
276 pass
277 result.close()
279 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
280 self.proxy_digest_handler.add_password(self.REALM, self.URL,
281 self.USER, self.PASSWD)
282 self.digest_auth_handler.set_qop("auth-int")
283 try:
284 result = self.opener.open(self.URL)
285 except urllib2.URLError:
286 # It's okay if we don't support auth-int, but we certainly
287 # shouldn't receive any kind of exception here other than
288 # a URLError.
289 result = None
290 if result:
291 while result.read():
292 pass
293 result.close()
296 def GetRequestHandler(responses):
298 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
300 server_version = "TestHTTP/"
301 requests = []
302 headers_received = []
303 port = 80
305 def do_GET(self):
306 body = self.send_head()
307 if body:
308 self.wfile.write(body)
310 def do_POST(self):
311 content_length = self.headers['Content-Length']
312 post_data = self.rfile.read(int(content_length))
313 self.do_GET()
314 self.requests.append(post_data)
316 def send_head(self):
317 FakeHTTPRequestHandler.headers_received = self.headers
318 self.requests.append(self.path)
319 response_code, headers, body = responses.pop(0)
321 self.send_response(response_code)
323 for (header, value) in headers:
324 self.send_header(header, value % self.port)
325 if body:
326 self.send_header('Content-type', 'text/plain')
327 self.end_headers()
328 return body
329 self.end_headers()
331 def log_message(self, *args):
332 pass
335 return FakeHTTPRequestHandler
338 class TestUrlopen(BaseTestCase):
339 """Tests urllib2.urlopen using the network.
341 These tests are not exhaustive. Assuming that testing using files does a
342 good job overall of some of the basic interface features. There are no
343 tests exercising the optional 'data' and 'proxies' arguments. No tests
344 for transparent redirection have been written.
347 def start_server(self, responses):
348 handler = GetRequestHandler(responses)
350 self.server = LoopbackHttpServerThread(handler)
351 self.server.start()
352 self.server.ready.wait()
353 port = self.server.port
354 handler.port = port
355 return handler
358 def test_redirection(self):
359 expected_response = 'We got here...'
360 responses = [
361 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
362 (200, [], expected_response)
365 handler = self.start_server(responses)
367 try:
368 f = urllib2.urlopen('http://localhost:%s/' % handler.port)
369 data = f.read()
370 f.close()
372 self.assertEquals(data, expected_response)
373 self.assertEquals(handler.requests, ['/', '/somewhere_else'])
374 finally:
375 self.server.stop()
378 def test_404(self):
379 expected_response = 'Bad bad bad...'
380 handler = self.start_server([(404, [], expected_response)])
382 try:
383 try:
384 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
385 except urllib2.URLError, f:
386 pass
387 else:
388 self.fail('404 should raise URLError')
390 data = f.read()
391 f.close()
393 self.assertEquals(data, expected_response)
394 self.assertEquals(handler.requests, ['/weeble'])
395 finally:
396 self.server.stop()
399 def test_200(self):
400 expected_response = 'pycon 2008...'
401 handler = self.start_server([(200, [], expected_response)])
403 try:
404 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
405 data = f.read()
406 f.close()
408 self.assertEquals(data, expected_response)
409 self.assertEquals(handler.requests, ['/bizarre'])
410 finally:
411 self.server.stop()
413 def test_200_with_parameters(self):
414 expected_response = 'pycon 2008...'
415 handler = self.start_server([(200, [], expected_response)])
417 try:
418 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
419 data = f.read()
420 f.close()
422 self.assertEquals(data, expected_response)
423 self.assertEquals(handler.requests, ['/bizarre', 'get=with_feeling'])
424 finally:
425 self.server.stop()
428 def test_sending_headers(self):
429 handler = self.start_server([(200, [], "we don't care")])
431 try:
432 req = urllib2.Request("http://localhost:%s/" % handler.port,
433 headers={'Range': 'bytes=20-39'})
434 urllib2.urlopen(req)
435 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
436 finally:
437 self.server.stop()
439 def test_basic(self):
440 handler = self.start_server([(200, [], "we don't care")])
442 try:
443 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
444 for attr in ("read", "close", "info", "geturl"):
445 self.assertTrue(hasattr(open_url, attr), "object returned from "
446 "urlopen lacks the %s attribute" % attr)
447 try:
448 self.assertTrue(open_url.read(), "calling 'read' failed")
449 finally:
450 open_url.close()
451 finally:
452 self.server.stop()
454 def test_info(self):
455 handler = self.start_server([(200, [], "we don't care")])
457 try:
458 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
459 info_obj = open_url.info()
460 self.assertTrue(isinstance(info_obj, mimetools.Message),
461 "object returned by 'info' is not an instance of "
462 "mimetools.Message")
463 self.assertEqual(info_obj.getsubtype(), "plain")
464 finally:
465 self.server.stop()
467 def test_geturl(self):
468 # Make sure same URL as opened is returned by geturl.
469 handler = self.start_server([(200, [], "we don't care")])
471 try:
472 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
473 url = open_url.geturl()
474 self.assertEqual(url, "http://localhost:%s" % handler.port)
475 finally:
476 self.server.stop()
479 def test_bad_address(self):
480 # Make sure proper exception is raised when connecting to a bogus
481 # address.
482 self.assertRaises(IOError,
483 # Given that both VeriSign and various ISPs have in
484 # the past or are presently hijacking various invalid
485 # domain name requests in an attempt to boost traffic
486 # to their own sites, finding a domain name to use
487 # for this test is difficult. RFC2606 leads one to
488 # believe that '.invalid' should work, but experience
489 # seemed to indicate otherwise. Single character
490 # TLDs are likely to remain invalid, so this seems to
491 # be the best choice. The trailing '.' prevents a
492 # related problem: The normal DNS resolver appends
493 # the domain names from the search path if there is
494 # no '.' the end and, and if one of those domains
495 # implements a '*' rule a result is returned.
496 # However, none of this will prevent the test from
497 # failing if the ISP hijacks all invalid domain
498 # requests. The real solution would be to be able to
499 # parameterize the framework with a mock resolver.
500 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
503 def test_main():
504 # We will NOT depend on the network resource flag
505 # (Lib/test/regrtest.py -u network) since all tests here are only
506 # localhost. However, if this is a bad rationale, then uncomment
507 # the next line.
508 #test_support.requires("network")
510 test_support.run_unittest(ProxyAuthTests, TestUrlopen)
512 if __name__ == "__main__":
513 test_main()