move sections
[python/dscho.git] / Lib / test / test_urllib2_localnet.py
blobef2614bc4139c9536835288614c33f7ae6abab68
1 #!/usr/bin/env python
3 import urlparse
4 import urllib2
5 import BaseHTTPServer
6 import unittest
7 import hashlib
8 from test import test_support
9 mimetools = test_support.import_module('mimetools', deprecated=True)
10 threading = test_support.import_module('threading')
12 # Loopback http server infrastructure
14 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
17 """
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseHTTPServer.HTTPServer.__init__(self,
21 server_address,
22 RequestHandlerClass)
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self.socket.settimeout(1.0)
28 def get_request(self):
29 """BaseHTTPServer method, overridden."""
31 request, client_address = self.socket.accept()
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request.settimeout(10.0)
38 return (request, client_address)
40 class LoopbackHttpServerThread(threading.Thread):
41 """Stoppable thread that runs a loopback http server."""
43 def __init__(self, request_handler):
44 threading.Thread.__init__(self)
45 self._stop = False
46 self.ready = threading.Event()
47 request_handler.protocol_version = "HTTP/1.0"
48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
49 request_handler)
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self.port = self.httpd.server_port
54 def stop(self):
55 """Stops the webserver if it's currently running."""
57 # Set the stop flag.
58 self._stop = True
60 self.join()
62 def run(self):
63 self.ready.set()
64 while not self._stop:
65 self.httpd.handle_request()
67 # Authentication infrastructure
69 class DigestAuthHandler:
70 """Handler for performing digest authentication."""
72 def __init__(self):
73 self._request_num = 0
74 self._nonces = []
75 self._users = {}
76 self._realm_name = "Test Realm"
77 self._qop = "auth"
79 def set_qop(self, qop):
80 self._qop = qop
82 def set_users(self, users):
83 assert isinstance(users, dict)
84 self._users = users
86 def set_realm(self, realm):
87 self._realm_name = realm
89 def _generate_nonce(self):
90 self._request_num += 1
91 nonce = hashlib.md5(str(self._request_num)).hexdigest()
92 self._nonces.append(nonce)
93 return nonce
95 def _create_auth_dict(self, auth_str):
96 first_space_index = auth_str.find(" ")
97 auth_str = auth_str[first_space_index+1:]
99 parts = auth_str.split(",")
101 auth_dict = {}
102 for part in parts:
103 name, value = part.split("=")
104 name = name.strip()
105 if value[0] == '"' and value[-1] == '"':
106 value = value[1:-1]
107 else:
108 value = value.strip()
109 auth_dict[name] = value
110 return auth_dict
112 def _validate_auth(self, auth_dict, password, method, uri):
113 final_dict = {}
114 final_dict.update(auth_dict)
115 final_dict["password"] = password
116 final_dict["method"] = method
117 final_dict["uri"] = uri
118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
119 HA1 = hashlib.md5(HA1_str).hexdigest()
120 HA2_str = "%(method)s:%(uri)s" % final_dict
121 HA2 = hashlib.md5(HA2_str).hexdigest()
122 final_dict["HA1"] = HA1
123 final_dict["HA2"] = HA2
124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126 response = hashlib.md5(response_str).hexdigest()
128 return response == auth_dict["response"]
130 def _return_auth_challenge(self, request_handler):
131 request_handler.send_response(407, "Proxy Authentication Required")
132 request_handler.send_header("Content-Type", "text/html")
133 request_handler.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
135 'qop="%s",'
136 'nonce="%s", ' % \
137 (self._realm_name, self._qop, self._generate_nonce()))
138 # XXX: Not sure if we're supposed to add this next header or
139 # not.
140 #request_handler.send_header('Connection', 'close')
141 request_handler.end_headers()
142 request_handler.wfile.write("Proxy Authentication Required.")
143 return False
145 def handle_request(self, request_handler):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
148 otherwise.
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
154 if len(self._users) == 0:
155 return True
157 if 'Proxy-Authorization' not in request_handler.headers:
158 return self._return_auth_challenge(request_handler)
159 else:
160 auth_dict = self._create_auth_dict(
161 request_handler.headers['Proxy-Authorization']
163 if auth_dict["username"] in self._users:
164 password = self._users[ auth_dict["username"] ]
165 else:
166 return self._return_auth_challenge(request_handler)
167 if not auth_dict.get("nonce") in self._nonces:
168 return self._return_auth_challenge(request_handler)
169 else:
170 self._nonces.remove(auth_dict["nonce"])
172 auth_validated = False
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
178 for path in [request_handler.path, request_handler.short_path]:
179 if self._validate_auth(auth_dict,
180 password,
181 request_handler.command,
182 path):
183 auth_validated = True
185 if not auth_validated:
186 return self._return_auth_challenge(request_handler)
187 return True
189 # Proxy test infrastructure
191 class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
195 testing.
198 def __init__(self, digest_auth_handler, *args, **kwargs):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self.digest_auth_handler = digest_auth_handler
202 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
204 def log_message(self, format, *args):
205 # Uncomment the next line for debugging.
206 #sys.stderr.write(format % args)
207 pass
209 def do_GET(self):
210 (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
211 self.path, 'http')
212 self.short_path = path
213 if self.digest_auth_handler.handle_request(self):
214 self.send_response(200, "OK")
215 self.send_header("Content-Type", "text/html")
216 self.end_headers()
217 self.wfile.write("You've reached %s!<BR>" % self.path)
218 self.wfile.write("Our apologies, but our server is down due to "
219 "a sudden zombie invasion.")
221 # Test cases
223 class BaseTestCase(unittest.TestCase):
224 def setUp(self):
225 self._threads = test_support.threading_setup()
227 def tearDown(self):
228 test_support.threading_cleanup(*self._threads)
231 class ProxyAuthTests(BaseTestCase):
232 URL = "http://localhost"
234 USER = "tester"
235 PASSWD = "test123"
236 REALM = "TestRealm"
238 def setUp(self):
239 super(ProxyAuthTests, self).setUp()
240 self.digest_auth_handler = DigestAuthHandler()
241 self.digest_auth_handler.set_users({self.USER: self.PASSWD})
242 self.digest_auth_handler.set_realm(self.REALM)
243 def create_fake_proxy_handler(*args, **kwargs):
244 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
246 self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
247 self.server.start()
248 self.server.ready.wait()
249 proxy_url = "http://127.0.0.1:%d" % self.server.port
250 handler = urllib2.ProxyHandler({"http" : proxy_url})
251 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
252 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
254 def tearDown(self):
255 self.server.stop()
256 super(ProxyAuthTests, self).tearDown()
258 def test_proxy_with_bad_password_raises_httperror(self):
259 self.proxy_digest_handler.add_password(self.REALM, self.URL,
260 self.USER, self.PASSWD+"bad")
261 self.digest_auth_handler.set_qop("auth")
262 self.assertRaises(urllib2.HTTPError,
263 self.opener.open,
264 self.URL)
266 def test_proxy_with_no_password_raises_httperror(self):
267 self.digest_auth_handler.set_qop("auth")
268 self.assertRaises(urllib2.HTTPError,
269 self.opener.open,
270 self.URL)
272 def test_proxy_qop_auth_works(self):
273 self.proxy_digest_handler.add_password(self.REALM, self.URL,
274 self.USER, self.PASSWD)
275 self.digest_auth_handler.set_qop("auth")
276 result = self.opener.open(self.URL)
277 while result.read():
278 pass
279 result.close()
281 def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
282 self.proxy_digest_handler.add_password(self.REALM, self.URL,
283 self.USER, self.PASSWD)
284 self.digest_auth_handler.set_qop("auth-int")
285 try:
286 result = self.opener.open(self.URL)
287 except urllib2.URLError:
288 # It's okay if we don't support auth-int, but we certainly
289 # shouldn't receive any kind of exception here other than
290 # a URLError.
291 result = None
292 if result:
293 while result.read():
294 pass
295 result.close()
298 def GetRequestHandler(responses):
300 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
302 server_version = "TestHTTP/"
303 requests = []
304 headers_received = []
305 port = 80
307 def do_GET(self):
308 body = self.send_head()
309 if body:
310 self.wfile.write(body)
312 def do_POST(self):
313 content_length = self.headers['Content-Length']
314 post_data = self.rfile.read(int(content_length))
315 self.do_GET()
316 self.requests.append(post_data)
318 def send_head(self):
319 FakeHTTPRequestHandler.headers_received = self.headers
320 self.requests.append(self.path)
321 response_code, headers, body = responses.pop(0)
323 self.send_response(response_code)
325 for (header, value) in headers:
326 self.send_header(header, value % self.port)
327 if body:
328 self.send_header('Content-type', 'text/plain')
329 self.end_headers()
330 return body
331 self.end_headers()
333 def log_message(self, *args):
334 pass
337 return FakeHTTPRequestHandler
340 class TestUrlopen(BaseTestCase):
341 """Tests urllib2.urlopen using the network.
343 These tests are not exhaustive. Assuming that testing using files does a
344 good job overall of some of the basic interface features. There are no
345 tests exercising the optional 'data' and 'proxies' arguments. No tests
346 for transparent redirection have been written.
349 def start_server(self, responses):
350 handler = GetRequestHandler(responses)
352 self.server = LoopbackHttpServerThread(handler)
353 self.server.start()
354 self.server.ready.wait()
355 port = self.server.port
356 handler.port = port
357 return handler
360 def test_redirection(self):
361 expected_response = 'We got here...'
362 responses = [
363 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
364 (200, [], expected_response)
367 handler = self.start_server(responses)
369 try:
370 f = urllib2.urlopen('http://localhost:%s/' % handler.port)
371 data = f.read()
372 f.close()
374 self.assertEquals(data, expected_response)
375 self.assertEquals(handler.requests, ['/', '/somewhere_else'])
376 finally:
377 self.server.stop()
380 def test_404(self):
381 expected_response = 'Bad bad bad...'
382 handler = self.start_server([(404, [], expected_response)])
384 try:
385 try:
386 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
387 except urllib2.URLError, f:
388 pass
389 else:
390 self.fail('404 should raise URLError')
392 data = f.read()
393 f.close()
395 self.assertEquals(data, expected_response)
396 self.assertEquals(handler.requests, ['/weeble'])
397 finally:
398 self.server.stop()
401 def test_200(self):
402 expected_response = 'pycon 2008...'
403 handler = self.start_server([(200, [], expected_response)])
405 try:
406 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
407 data = f.read()
408 f.close()
410 self.assertEquals(data, expected_response)
411 self.assertEquals(handler.requests, ['/bizarre'])
412 finally:
413 self.server.stop()
415 def test_200_with_parameters(self):
416 expected_response = 'pycon 2008...'
417 handler = self.start_server([(200, [], expected_response)])
419 try:
420 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
421 data = f.read()
422 f.close()
424 self.assertEquals(data, expected_response)
425 self.assertEquals(handler.requests, ['/bizarre', 'get=with_feeling'])
426 finally:
427 self.server.stop()
430 def test_sending_headers(self):
431 handler = self.start_server([(200, [], "we don't care")])
433 try:
434 req = urllib2.Request("http://localhost:%s/" % handler.port,
435 headers={'Range': 'bytes=20-39'})
436 urllib2.urlopen(req)
437 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
438 finally:
439 self.server.stop()
441 def test_basic(self):
442 handler = self.start_server([(200, [], "we don't care")])
444 try:
445 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
446 for attr in ("read", "close", "info", "geturl"):
447 self.assertTrue(hasattr(open_url, attr), "object returned from "
448 "urlopen lacks the %s attribute" % attr)
449 try:
450 self.assertTrue(open_url.read(), "calling 'read' failed")
451 finally:
452 open_url.close()
453 finally:
454 self.server.stop()
456 def test_info(self):
457 handler = self.start_server([(200, [], "we don't care")])
459 try:
460 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
461 info_obj = open_url.info()
462 self.assertIsInstance(info_obj, mimetools.Message,
463 "object returned by 'info' is not an "
464 "instance of mimetools.Message")
465 self.assertEqual(info_obj.getsubtype(), "plain")
466 finally:
467 self.server.stop()
469 def test_geturl(self):
470 # Make sure same URL as opened is returned by geturl.
471 handler = self.start_server([(200, [], "we don't care")])
473 try:
474 open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
475 url = open_url.geturl()
476 self.assertEqual(url, "http://localhost:%s" % handler.port)
477 finally:
478 self.server.stop()
481 def test_bad_address(self):
482 # Make sure proper exception is raised when connecting to a bogus
483 # address.
484 self.assertRaises(IOError,
485 # Given that both VeriSign and various ISPs have in
486 # the past or are presently hijacking various invalid
487 # domain name requests in an attempt to boost traffic
488 # to their own sites, finding a domain name to use
489 # for this test is difficult. RFC2606 leads one to
490 # believe that '.invalid' should work, but experience
491 # seemed to indicate otherwise. Single character
492 # TLDs are likely to remain invalid, so this seems to
493 # be the best choice. The trailing '.' prevents a
494 # related problem: The normal DNS resolver appends
495 # the domain names from the search path if there is
496 # no '.' the end and, and if one of those domains
497 # implements a '*' rule a result is returned.
498 # However, none of this will prevent the test from
499 # failing if the ISP hijacks all invalid domain
500 # requests. The real solution would be to be able to
501 # parameterize the framework with a mock resolver.
502 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
505 def test_main():
506 # We will NOT depend on the network resource flag
507 # (Lib/test/regrtest.py -u network) since all tests here are only
508 # localhost. However, if this is a bad rationale, then uncomment
509 # the next line.
510 #test_support.requires("network")
512 test_support.run_unittest(ProxyAuthTests, TestUrlopen)
514 if __name__ == "__main__":
515 test_main()