10 from test
import test_support
12 # Loopback http server infrastructure
14 class LoopbackHttpServer(BaseHTTPServer
.HTTPServer
):
15 """HTTP server w/ a few modifications that make it useful for
16 loopback testing purposes.
19 def __init__(self
, server_address
, RequestHandlerClass
):
20 BaseHTTPServer
.HTTPServer
.__init
__(self
,
24 # Set the timeout of our listening socket really low so
25 # that we can stop the server easily.
26 self
.socket
.settimeout(1.0)
28 def get_request(self
):
29 """BaseHTTPServer method, overridden."""
31 request
, client_address
= self
.socket
.accept()
33 # It's a loopback connection, so setting the timeout
34 # really low shouldn't affect anything, but should make
35 # deadlocks less likely to occur.
36 request
.settimeout(10.0)
38 return (request
, client_address
)
40 class LoopbackHttpServerThread(threading
.Thread
):
41 """Stoppable thread that runs a loopback http server."""
43 def __init__(self
, request_handler
):
44 threading
.Thread
.__init
__(self
)
46 self
.ready
= threading
.Event()
47 request_handler
.protocol_version
= "HTTP/1.0"
48 self
.httpd
= LoopbackHttpServer(('127.0.0.1', 0),
50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51 # self.httpd.server_port)
52 self
.port
= self
.httpd
.server_port
55 """Stops the webserver if it's currently running."""
65 self
.httpd
.handle_request()
67 # Authentication infrastructure
69 class DigestAuthHandler
:
70 """Handler for performing digest authentication."""
76 self
._realm
_name
= "Test Realm"
79 def set_qop(self
, qop
):
82 def set_users(self
, users
):
83 assert isinstance(users
, dict)
86 def set_realm(self
, realm
):
87 self
._realm
_name
= realm
89 def _generate_nonce(self
):
90 self
._request
_num
+= 1
91 nonce
= hashlib
.md5(str(self
._request
_num
)).hexdigest()
92 self
._nonces
.append(nonce
)
95 def _create_auth_dict(self
, auth_str
):
96 first_space_index
= auth_str
.find(" ")
97 auth_str
= auth_str
[first_space_index
+1:]
99 parts
= auth_str
.split(",")
103 name
, value
= part
.split("=")
105 if value
[0] == '"' and value
[-1] == '"':
108 value
= value
.strip()
109 auth_dict
[name
] = value
112 def _validate_auth(self
, auth_dict
, password
, method
, uri
):
114 final_dict
.update(auth_dict
)
115 final_dict
["password"] = password
116 final_dict
["method"] = method
117 final_dict
["uri"] = uri
118 HA1_str
= "%(username)s:%(realm)s:%(password)s" % final_dict
119 HA1
= hashlib
.md5(HA1_str
).hexdigest()
120 HA2_str
= "%(method)s:%(uri)s" % final_dict
121 HA2
= hashlib
.md5(HA2_str
).hexdigest()
122 final_dict
["HA1"] = HA1
123 final_dict
["HA2"] = HA2
124 response_str
= "%(HA1)s:%(nonce)s:%(nc)s:" \
125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126 response
= hashlib
.md5(response_str
).hexdigest()
128 return response
== auth_dict
["response"]
130 def _return_auth_challenge(self
, request_handler
):
131 request_handler
.send_response(407, "Proxy Authentication Required")
132 request_handler
.send_header("Content-Type", "text/html")
133 request_handler
.send_header(
134 'Proxy-Authenticate', 'Digest realm="%s", '
137 (self
._realm
_name
, self
._qop
, self
._generate
_nonce
()))
138 # XXX: Not sure if we're supposed to add this next header or
140 #request_handler.send_header('Connection', 'close')
141 request_handler
.end_headers()
142 request_handler
.wfile
.write("Proxy Authentication Required.")
145 def handle_request(self
, request_handler
):
146 """Performs digest authentication on the given HTTP request
147 handler. Returns True if authentication was successful, False
150 If no users have been set, then digest auth is effectively
151 disabled and this method will always return True.
154 if len(self
._users
) == 0:
157 if not request_handler
.headers
.has_key('Proxy-Authorization'):
158 return self
._return
_auth
_challenge
(request_handler
)
160 auth_dict
= self
._create
_auth
_dict
(
161 request_handler
.headers
['Proxy-Authorization']
163 if self
._users
.has_key(auth_dict
["username"]):
164 password
= self
._users
[ auth_dict
["username"] ]
166 return self
._return
_auth
_challenge
(request_handler
)
167 if not auth_dict
.get("nonce") in self
._nonces
:
168 return self
._return
_auth
_challenge
(request_handler
)
170 self
._nonces
.remove(auth_dict
["nonce"])
172 auth_validated
= False
174 # MSIE uses short_path in its validation, but Python's
175 # urllib2 uses the full path, so we're going to see if
176 # either of them works here.
178 for path
in [request_handler
.path
, request_handler
.short_path
]:
179 if self
._validate
_auth
(auth_dict
,
181 request_handler
.command
,
183 auth_validated
= True
185 if not auth_validated
:
186 return self
._return
_auth
_challenge
(request_handler
)
189 # Proxy test infrastructure
191 class FakeProxyHandler(BaseHTTPServer
.BaseHTTPRequestHandler
):
192 """This is a 'fake proxy' that makes it look like the entire
193 internet has gone down due to a sudden zombie invasion. It main
194 utility is in providing us with authentication support for
198 def __init__(self
, digest_auth_handler
, *args
, **kwargs
):
199 # This has to be set before calling our parent's __init__(), which will
200 # try to call do_GET().
201 self
.digest_auth_handler
= digest_auth_handler
202 BaseHTTPServer
.BaseHTTPRequestHandler
.__init
__(self
, *args
, **kwargs
)
204 def log_message(self
, format
, *args
):
205 # Uncomment the next line for debugging.
206 #sys.stderr.write(format % args)
210 (scm
, netloc
, path
, params
, query
, fragment
) = urlparse
.urlparse(
212 self
.short_path
= path
213 if self
.digest_auth_handler
.handle_request(self
):
214 self
.send_response(200, "OK")
215 self
.send_header("Content-Type", "text/html")
217 self
.wfile
.write("You've reached %s!<BR>" % self
.path
)
218 self
.wfile
.write("Our apologies, but our server is down due to "
219 "a sudden zombie invasion.")
223 class BaseTestCase(unittest
.TestCase
):
225 self
._threads
= test_support
.threading_setup()
228 test_support
.threading_cleanup(*self
._threads
)
231 class ProxyAuthTests(BaseTestCase
):
232 URL
= "http://localhost"
239 self
.digest_auth_handler
= DigestAuthHandler()
240 self
.digest_auth_handler
.set_users({self
.USER
: self
.PASSWD
})
241 self
.digest_auth_handler
.set_realm(self
.REALM
)
242 def create_fake_proxy_handler(*args
, **kwargs
):
243 return FakeProxyHandler(self
.digest_auth_handler
, *args
, **kwargs
)
245 self
.server
= LoopbackHttpServerThread(create_fake_proxy_handler
)
247 self
.server
.ready
.wait()
248 proxy_url
= "http://127.0.0.1:%d" % self
.server
.port
249 handler
= urllib2
.ProxyHandler({"http" : proxy_url
})
250 self
.proxy_digest_handler
= urllib2
.ProxyDigestAuthHandler()
251 self
.opener
= urllib2
.build_opener(handler
, self
.proxy_digest_handler
)
256 def test_proxy_with_bad_password_raises_httperror(self
):
257 self
.proxy_digest_handler
.add_password(self
.REALM
, self
.URL
,
258 self
.USER
, self
.PASSWD
+"bad")
259 self
.digest_auth_handler
.set_qop("auth")
260 self
.assertRaises(urllib2
.HTTPError
,
264 def test_proxy_with_no_password_raises_httperror(self
):
265 self
.digest_auth_handler
.set_qop("auth")
266 self
.assertRaises(urllib2
.HTTPError
,
270 def test_proxy_qop_auth_works(self
):
271 self
.proxy_digest_handler
.add_password(self
.REALM
, self
.URL
,
272 self
.USER
, self
.PASSWD
)
273 self
.digest_auth_handler
.set_qop("auth")
274 result
= self
.opener
.open(self
.URL
)
279 def test_proxy_qop_auth_int_works_or_throws_urlerror(self
):
280 self
.proxy_digest_handler
.add_password(self
.REALM
, self
.URL
,
281 self
.USER
, self
.PASSWD
)
282 self
.digest_auth_handler
.set_qop("auth-int")
284 result
= self
.opener
.open(self
.URL
)
285 except urllib2
.URLError
:
286 # It's okay if we don't support auth-int, but we certainly
287 # shouldn't receive any kind of exception here other than
296 def GetRequestHandler(responses
):
298 class FakeHTTPRequestHandler(BaseHTTPServer
.BaseHTTPRequestHandler
):
300 server_version
= "TestHTTP/"
302 headers_received
= []
306 body
= self
.send_head()
308 self
.wfile
.write(body
)
311 content_length
= self
.headers
['Content-Length']
312 post_data
= self
.rfile
.read(int(content_length
))
314 self
.requests
.append(post_data
)
317 FakeHTTPRequestHandler
.headers_received
= self
.headers
318 self
.requests
.append(self
.path
)
319 response_code
, headers
, body
= responses
.pop(0)
321 self
.send_response(response_code
)
323 for (header
, value
) in headers
:
324 self
.send_header(header
, value
% self
.port
)
326 self
.send_header('Content-type', 'text/plain')
331 def log_message(self
, *args
):
335 return FakeHTTPRequestHandler
338 class TestUrlopen(BaseTestCase
):
339 """Tests urllib2.urlopen using the network.
341 These tests are not exhaustive. Assuming that testing using files does a
342 good job overall of some of the basic interface features. There are no
343 tests exercising the optional 'data' and 'proxies' arguments. No tests
344 for transparent redirection have been written.
347 def start_server(self
, responses
):
348 handler
= GetRequestHandler(responses
)
350 self
.server
= LoopbackHttpServerThread(handler
)
352 self
.server
.ready
.wait()
353 port
= self
.server
.port
358 def test_redirection(self
):
359 expected_response
= 'We got here...'
361 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
362 (200, [], expected_response
)
365 handler
= self
.start_server(responses
)
368 f
= urllib2
.urlopen('http://localhost:%s/' % handler
.port
)
372 self
.assertEquals(data
, expected_response
)
373 self
.assertEquals(handler
.requests
, ['/', '/somewhere_else'])
379 expected_response
= 'Bad bad bad...'
380 handler
= self
.start_server([(404, [], expected_response
)])
384 urllib2
.urlopen('http://localhost:%s/weeble' % handler
.port
)
385 except urllib2
.URLError
, f
:
388 self
.fail('404 should raise URLError')
393 self
.assertEquals(data
, expected_response
)
394 self
.assertEquals(handler
.requests
, ['/weeble'])
400 expected_response
= 'pycon 2008...'
401 handler
= self
.start_server([(200, [], expected_response
)])
404 f
= urllib2
.urlopen('http://localhost:%s/bizarre' % handler
.port
)
408 self
.assertEquals(data
, expected_response
)
409 self
.assertEquals(handler
.requests
, ['/bizarre'])
413 def test_200_with_parameters(self
):
414 expected_response
= 'pycon 2008...'
415 handler
= self
.start_server([(200, [], expected_response
)])
418 f
= urllib2
.urlopen('http://localhost:%s/bizarre' % handler
.port
, 'get=with_feeling')
422 self
.assertEquals(data
, expected_response
)
423 self
.assertEquals(handler
.requests
, ['/bizarre', 'get=with_feeling'])
428 def test_sending_headers(self
):
429 handler
= self
.start_server([(200, [], "we don't care")])
432 req
= urllib2
.Request("http://localhost:%s/" % handler
.port
,
433 headers
={'Range': 'bytes=20-39'})
435 self
.assertEqual(handler
.headers_received
['Range'], 'bytes=20-39')
439 def test_basic(self
):
440 handler
= self
.start_server([(200, [], "we don't care")])
443 open_url
= urllib2
.urlopen("http://localhost:%s" % handler
.port
)
444 for attr
in ("read", "close", "info", "geturl"):
445 self
.assertTrue(hasattr(open_url
, attr
), "object returned from "
446 "urlopen lacks the %s attribute" % attr
)
448 self
.assertTrue(open_url
.read(), "calling 'read' failed")
455 handler
= self
.start_server([(200, [], "we don't care")])
458 open_url
= urllib2
.urlopen("http://localhost:%s" % handler
.port
)
459 info_obj
= open_url
.info()
460 self
.assertTrue(isinstance(info_obj
, mimetools
.Message
),
461 "object returned by 'info' is not an instance of "
463 self
.assertEqual(info_obj
.getsubtype(), "plain")
467 def test_geturl(self
):
468 # Make sure same URL as opened is returned by geturl.
469 handler
= self
.start_server([(200, [], "we don't care")])
472 open_url
= urllib2
.urlopen("http://localhost:%s" % handler
.port
)
473 url
= open_url
.geturl()
474 self
.assertEqual(url
, "http://localhost:%s" % handler
.port
)
479 def test_bad_address(self
):
480 # Make sure proper exception is raised when connecting to a bogus
482 self
.assertRaises(IOError,
483 # Given that both VeriSign and various ISPs have in
484 # the past or are presently hijacking various invalid
485 # domain name requests in an attempt to boost traffic
486 # to their own sites, finding a domain name to use
487 # for this test is difficult. RFC2606 leads one to
488 # believe that '.invalid' should work, but experience
489 # seemed to indicate otherwise. Single character
490 # TLDs are likely to remain invalid, so this seems to
491 # be the best choice. The trailing '.' prevents a
492 # related problem: The normal DNS resolver appends
493 # the domain names from the search path if there is
494 # no '.' the end and, and if one of those domains
495 # implements a '*' rule a result is returned.
496 # However, none of this will prevent the test from
497 # failing if the ISP hijacks all invalid domain
498 # requests. The real solution would be to be able to
499 # parameterize the framework with a mock resolver.
500 urllib2
.urlopen
, "http://sadflkjsasf.i.nvali.d./")
504 # We will NOT depend on the network resource flag
505 # (Lib/test/regrtest.py -u network) since all tests here are only
506 # localhost. However, if this is a bad rationale, then uncomment
508 #test_support.requires("network")
510 test_support
.run_unittest(ProxyAuthTests
, TestUrlopen
)
512 if __name__
== "__main__":