8 from test
import test_support
9 mimetools
= test_support
.import_module('mimetools', deprecated
=True)
10 threading
= test_support
.import_module('threading')
12 # Loopback http server infrastructure
14 class LoopbackHttpServer(BaseHTTPServer
.HTTPServer
):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
19 def __init__(self
, server_address
, RequestHandlerClass
):
20 BaseHTTPServer
.HTTPServer
.__init
__(self
,
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self
.socket
.settimeout(1.0)
28 def get_request(self
):
29 """BaseHTTPServer method, overridden."""
31 request
, client_address
= self
.socket
.accept()
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request
.settimeout(10.0)
38 return (request
, client_address
)
40 class LoopbackHttpServerThread(threading
.Thread
):
41 """Stoppable thread that runs a loopback http server."""
43 def __init__(self
, request_handler
):
44 threading
.Thread
.__init
__(self
)
46 self
.ready
= threading
.Event()
47 request_handler
.protocol_version
= "HTTP/1.0"
48 self
.httpd
= LoopbackHttpServer(('127.0.0.1', 0),
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self
.port
= self
.httpd
.server_port
55 """Stops the webserver if it's currently running."""
65 self
.httpd
.handle_request()
67 # Authentication infrastructure
69 class DigestAuthHandler
:
70 """Handler for performing digest authentication."""
76 self
._realm
_name
= "Test Realm"
79 def set_qop(self
, qop
):
82 def set_users(self
, users
):
83 assert isinstance(users
, dict)
86 def set_realm(self
, realm
):
87 self
._realm
_name
= realm
89 def _generate_nonce(self
):
90 self
._request
_num
+= 1
91 nonce
= hashlib
.md5(str(self
._request
_num
)).hexdigest()
92 self
._nonces
.append(nonce
)
95 def _create_auth_dict(self
, auth_str
):
96 first_space_index
= auth_str
.find(" ")
97 auth_str
= auth_str
[first_space_index
+1:]
99 parts
= auth_str
.split(",")
103 name
, value
= part
.split("=")
105 if value
[0] == '"' and value
[-1] == '"':
108 value
= value
.strip()
109 auth_dict
[name
] = value
112 def _validate_auth(self
, auth_dict
, password
, method
, uri
):
114 final_dict
.update(auth_dict
)
115 final_dict
["password"] = password
116 final_dict
["method"] = method
117 final_dict
["uri"] = uri
118 HA1_str
= "%(username)s:%(realm)s:%(password)s" % final_dict
119 HA1
= hashlib
.md5(HA1_str
).hexdigest()
120 HA2_str
= "%(method)s:%(uri)s" % final_dict
121 HA2
= hashlib
.md5(HA2_str
).hexdigest()
122 final_dict
["HA1"] = HA1
123 final_dict
["HA2"] = HA2
124 response_str
= "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126 response
= hashlib
.md5(response_str
).hexdigest()
128 return response
== auth_dict
["response"]
130 def _return_auth_challenge(self
, request_handler
):
131 request_handler
.send_response(407, "Proxy Authentication Required")
132 request_handler
.send_header("Content-Type", "text/html")
133 request_handler
.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
137 (self
._realm
_name
, self
._qop
, self
._generate
_nonce
()))
138 # XXX: Not sure if we're supposed to add this next header or
140 #request_handler.send_header('Connection', 'close')
141 request_handler
.end_headers()
142 request_handler
.wfile
.write("Proxy Authentication Required.")
145 def handle_request(self
, request_handler
):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
154 if len(self
._users
) == 0:
157 if 'Proxy-Authorization' not in request_handler
.headers
:
158 return self
._return
_auth
_challenge
(request_handler
)
160 auth_dict
= self
._create
_auth
_dict
(
161 request_handler
.headers
['Proxy-Authorization']
163 if auth_dict
["username"] in self
._users
:
164 password
= self
._users
[ auth_dict
["username"] ]
166 return self
._return
_auth
_challenge
(request_handler
)
167 if not auth_dict
.get("nonce") in self
._nonces
:
168 return self
._return
_auth
_challenge
(request_handler
)
170 self
._nonces
.remove(auth_dict
["nonce"])
172 auth_validated
= False
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
178 for path
in [request_handler
.path
, request_handler
.short_path
]:
179 if self
._validate
_auth
(auth_dict
,
181 request_handler
.command
,
183 auth_validated
= True
185 if not auth_validated
:
186 return self
._return
_auth
_challenge
(request_handler
)
189 # Proxy test infrastructure
191 class FakeProxyHandler(BaseHTTPServer
.BaseHTTPRequestHandler
):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
198 def __init__(self
, digest_auth_handler
, *args
, **kwargs
):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self
.digest_auth_handler
= digest_auth_handler
202 BaseHTTPServer
.BaseHTTPRequestHandler
.__init
__(self
, *args
, **kwargs
)
204 def log_message(self
, format
, *args
):
205 # Uncomment the next line for debugging.
206 #sys.stderr.write(format % args)
210 (scm
, netloc
, path
, params
, query
, fragment
) = urlparse
.urlparse(
212 self
.short_path
= path
213 if self
.digest_auth_handler
.handle_request(self
):
214 self
.send_response(200, "OK")
215 self
.send_header("Content-Type", "text/html")
217 self
.wfile
.write("You've reached %s!<BR>" % self
.path
)
218 self
.wfile
.write("Our apologies, but our server is down due to "
219 "a sudden zombie invasion.")
223 class BaseTestCase(unittest
.TestCase
):
225 self
._threads
= test_support
.threading_setup()
228 test_support
.threading_cleanup(*self
._threads
)
231 class ProxyAuthTests(BaseTestCase
):
232 URL
= "http://localhost"
239 super(ProxyAuthTests
, self
).setUp()
240 self
.digest_auth_handler
= DigestAuthHandler()
241 self
.digest_auth_handler
.set_users({self
.USER
: self
.PASSWD
})
242 self
.digest_auth_handler
.set_realm(self
.REALM
)
243 def create_fake_proxy_handler(*args
, **kwargs
):
244 return FakeProxyHandler(self
.digest_auth_handler
, *args
, **kwargs
)
246 self
.server
= LoopbackHttpServerThread(create_fake_proxy_handler
)
248 self
.server
.ready
.wait()
249 proxy_url
= "http://127.0.0.1:%d" % self
.server
.port
250 handler
= urllib2
.ProxyHandler({"http" : proxy_url
})
251 self
.proxy_digest_handler
= urllib2
.ProxyDigestAuthHandler()
252 self
.opener
= urllib2
.build_opener(handler
, self
.proxy_digest_handler
)
256 super(ProxyAuthTests
, self
).tearDown()
258 def test_proxy_with_bad_password_raises_httperror(self
):
259 self
.proxy_digest_handler
.add_password(self
.REALM
, self
.URL
,
260 self
.USER
, self
.PASSWD
+"bad")
261 self
.digest_auth_handler
.set_qop("auth")
262 self
.assertRaises(urllib2
.HTTPError
,
266 def test_proxy_with_no_password_raises_httperror(self
):
267 self
.digest_auth_handler
.set_qop("auth")
268 self
.assertRaises(urllib2
.HTTPError
,
272 def test_proxy_qop_auth_works(self
):
273 self
.proxy_digest_handler
.add_password(self
.REALM
, self
.URL
,
274 self
.USER
, self
.PASSWD
)
275 self
.digest_auth_handler
.set_qop("auth")
276 result
= self
.opener
.open(self
.URL
)
281 def test_proxy_qop_auth_int_works_or_throws_urlerror(self
):
282 self
.proxy_digest_handler
.add_password(self
.REALM
, self
.URL
,
283 self
.USER
, self
.PASSWD
)
284 self
.digest_auth_handler
.set_qop("auth-int")
286 result
= self
.opener
.open(self
.URL
)
287 except urllib2
.URLError
:
288 # It's okay if we don't support auth-int, but we certainly
289 # shouldn't receive any kind of exception here other than
298 def GetRequestHandler(responses
):
300 class FakeHTTPRequestHandler(BaseHTTPServer
.BaseHTTPRequestHandler
):
302 server_version
= "TestHTTP/"
304 headers_received
= []
308 body
= self
.send_head()
310 self
.wfile
.write(body
)
313 content_length
= self
.headers
['Content-Length']
314 post_data
= self
.rfile
.read(int(content_length
))
316 self
.requests
.append(post_data
)
319 FakeHTTPRequestHandler
.headers_received
= self
.headers
320 self
.requests
.append(self
.path
)
321 response_code
, headers
, body
= responses
.pop(0)
323 self
.send_response(response_code
)
325 for (header
, value
) in headers
:
326 self
.send_header(header
, value
% self
.port
)
328 self
.send_header('Content-type', 'text/plain')
333 def log_message(self
, *args
):
337 return FakeHTTPRequestHandler
340 class TestUrlopen(BaseTestCase
):
341 """Tests urllib2.urlopen using the network.
343 These tests are not exhaustive. Assuming that testing using files does a
344 good job overall of some of the basic interface features. There are no
345 tests exercising the optional 'data' and 'proxies' arguments. No tests
346 for transparent redirection have been written.
349 def start_server(self
, responses
):
350 handler
= GetRequestHandler(responses
)
352 self
.server
= LoopbackHttpServerThread(handler
)
354 self
.server
.ready
.wait()
355 port
= self
.server
.port
360 def test_redirection(self
):
361 expected_response
= 'We got here...'
363 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
364 (200, [], expected_response
)
367 handler
= self
.start_server(responses
)
370 f
= urllib2
.urlopen('http://localhost:%s/' % handler
.port
)
374 self
.assertEquals(data
, expected_response
)
375 self
.assertEquals(handler
.requests
, ['/', '/somewhere_else'])
381 expected_response
= 'Bad bad bad...'
382 handler
= self
.start_server([(404, [], expected_response
)])
386 urllib2
.urlopen('http://localhost:%s/weeble' % handler
.port
)
387 except urllib2
.URLError
, f
:
390 self
.fail('404 should raise URLError')
395 self
.assertEquals(data
, expected_response
)
396 self
.assertEquals(handler
.requests
, ['/weeble'])
402 expected_response
= 'pycon 2008...'
403 handler
= self
.start_server([(200, [], expected_response
)])
406 f
= urllib2
.urlopen('http://localhost:%s/bizarre' % handler
.port
)
410 self
.assertEquals(data
, expected_response
)
411 self
.assertEquals(handler
.requests
, ['/bizarre'])
415 def test_200_with_parameters(self
):
416 expected_response
= 'pycon 2008...'
417 handler
= self
.start_server([(200, [], expected_response
)])
420 f
= urllib2
.urlopen('http://localhost:%s/bizarre' % handler
.port
, 'get=with_feeling')
424 self
.assertEquals(data
, expected_response
)
425 self
.assertEquals(handler
.requests
, ['/bizarre', 'get=with_feeling'])
430 def test_sending_headers(self
):
431 handler
= self
.start_server([(200, [], "we don't care")])
434 req
= urllib2
.Request("http://localhost:%s/" % handler
.port
,
435 headers
={'Range': 'bytes=20-39'})
437 self
.assertEqual(handler
.headers_received
['Range'], 'bytes=20-39')
441 def test_basic(self
):
442 handler
= self
.start_server([(200, [], "we don't care")])
445 open_url
= urllib2
.urlopen("http://localhost:%s" % handler
.port
)
446 for attr
in ("read", "close", "info", "geturl"):
447 self
.assertTrue(hasattr(open_url
, attr
), "object returned from "
448 "urlopen lacks the %s attribute" % attr
)
450 self
.assertTrue(open_url
.read(), "calling 'read' failed")
457 handler
= self
.start_server([(200, [], "we don't care")])
460 open_url
= urllib2
.urlopen("http://localhost:%s" % handler
.port
)
461 info_obj
= open_url
.info()
462 self
.assertIsInstance(info_obj
, mimetools
.Message
,
463 "object returned by 'info' is not an "
464 "instance of mimetools.Message")
465 self
.assertEqual(info_obj
.getsubtype(), "plain")
469 def test_geturl(self
):
470 # Make sure same URL as opened is returned by geturl.
471 handler
= self
.start_server([(200, [], "we don't care")])
474 open_url
= urllib2
.urlopen("http://localhost:%s" % handler
.port
)
475 url
= open_url
.geturl()
476 self
.assertEqual(url
, "http://localhost:%s" % handler
.port
)
481 def test_bad_address(self
):
482 # Make sure proper exception is raised when connecting to a bogus
484 self
.assertRaises(IOError,
485 # Given that both VeriSign and various ISPs have in
486 # the past or are presently hijacking various invalid
487 # domain name requests in an attempt to boost traffic
488 # to their own sites, finding a domain name to use
489 # for this test is difficult. RFC2606 leads one to
490 # believe that '.invalid' should work, but experience
491 # seemed to indicate otherwise. Single character
492 # TLDs are likely to remain invalid, so this seems to
493 # be the best choice. The trailing '.' prevents a
494 # related problem: The normal DNS resolver appends
495 # the domain names from the search path if there is
496 # no '.' the end and, and if one of those domains
497 # implements a '*' rule a result is returned.
498 # However, none of this will prevent the test from
499 # failing if the ISP hijacks all invalid domain
500 # requests. The real solution would be to be able to
501 # parameterize the framework with a mock resolver.
502 urllib2
.urlopen
, "http://sadflkjsasf.i.nvali.d./")
506 # We will NOT depend on the network resource flag
507 # (Lib/test/regrtest.py -u network) since all tests here are only
508 # localhost. However, if this is a bad rationale, then uncomment
510 #test_support.requires("network")
512 test_support
.run_unittest(ProxyAuthTests
, TestUrlopen
)
514 if __name__
== "__main__":