move sections
[python/dscho.git] / Lib / test / test_urllib2.py
blobb5a243705a206c27e1d962d2d114523b9c70ca15
1 import unittest
2 from test import test_support
4 import os
5 import socket
6 import StringIO
8 import urllib2
9 from urllib2 import Request, OpenerDirector
11 # XXX
12 # Request
13 # CacheFTPHandler (hard to write)
14 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
16 class TrivialTests(unittest.TestCase):
17 def test_trivial(self):
18 # A couple trivial tests
20 self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
22 # XXX Name hacking to get this to work on Windows.
23 fname = os.path.abspath(urllib2.__file__).replace('\\', '/')
25 # And more hacking to get it to work on MacOS. This assumes
26 # urllib.pathname2url works, unfortunately...
27 if os.name == 'riscos':
28 import string
29 fname = os.expand(fname)
30 fname = fname.translate(string.maketrans("/.", "./"))
32 if os.name == 'nt':
33 file_url = "file:///%s" % fname
34 else:
35 file_url = "file://%s" % fname
37 f = urllib2.urlopen(file_url)
39 buf = f.read()
40 f.close()
42 def test_parse_http_list(self):
43 tests = [('a,b,c', ['a', 'b', 'c']),
44 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
45 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
46 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
47 for string, list in tests:
48 self.assertEquals(urllib2.parse_http_list(string), list)
51 def test_request_headers_dict():
52 """
53 The Request.headers dictionary is not a documented interface. It should
54 stay that way, because the complete set of headers are only accessible
55 through the .get_header(), .has_header(), .header_items() interface.
56 However, .headers pre-dates those methods, and so real code will be using
57 the dictionary.
59 The introduction in 2.4 of those methods was a mistake for the same reason:
60 code that previously saw all (urllib2 user)-provided headers in .headers
61 now sees only a subset (and the function interface is ugly and incomplete).
62 A better change would have been to replace .headers dict with a dict
63 subclass (or UserDict.DictMixin instance?) that preserved the .headers
64 interface and also provided access to the "unredirected" headers. It's
65 probably too late to fix that, though.
68 Check .capitalize() case normalization:
70 >>> url = "http://example.com"
71 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
72 'blah'
73 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
74 'blah'
76 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
77 but that could be changed in future.
79 """
81 def test_request_headers_methods():
82 """
83 Note the case normalization of header names here, to .capitalize()-case.
84 This should be preserved for backwards-compatibility. (In the HTTP case,
85 normalization to .title()-case is done by urllib2 before sending headers to
86 httplib).
88 >>> url = "http://example.com"
89 >>> r = Request(url, headers={"Spam-eggs": "blah"})
90 >>> r.has_header("Spam-eggs")
91 True
92 >>> r.header_items()
93 [('Spam-eggs', 'blah')]
94 >>> r.add_header("Foo-Bar", "baz")
95 >>> items = r.header_items()
96 >>> items.sort()
97 >>> items
98 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
100 Note that e.g. r.has_header("spam-EggS") is currently False, and
101 r.get_header("spam-EggS") returns None, but that could be changed in
102 future.
104 >>> r.has_header("Not-there")
105 False
106 >>> print r.get_header("Not-there")
107 None
108 >>> r.get_header("Not-there", "default")
109 'default'
114 def test_password_manager(self):
116 >>> mgr = urllib2.HTTPPasswordMgr()
117 >>> add = mgr.add_password
118 >>> add("Some Realm", "http://example.com/", "joe", "password")
119 >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
120 >>> add("c", "http://example.com/foo", "foo", "ni")
121 >>> add("c", "http://example.com/bar", "bar", "nini")
122 >>> add("b", "http://example.com/", "first", "blah")
123 >>> add("b", "http://example.com/", "second", "spam")
124 >>> add("a", "http://example.com", "1", "a")
125 >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
126 >>> add("Some Realm", "d.example.com", "4", "d")
127 >>> add("Some Realm", "e.example.com:3128", "5", "e")
129 >>> mgr.find_user_password("Some Realm", "example.com")
130 ('joe', 'password')
131 >>> mgr.find_user_password("Some Realm", "http://example.com")
132 ('joe', 'password')
133 >>> mgr.find_user_password("Some Realm", "http://example.com/")
134 ('joe', 'password')
135 >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
136 ('joe', 'password')
137 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
138 ('joe', 'password')
139 >>> mgr.find_user_password("c", "http://example.com/foo")
140 ('foo', 'ni')
141 >>> mgr.find_user_password("c", "http://example.com/bar")
142 ('bar', 'nini')
144 Actually, this is really undefined ATM
145 ## Currently, we use the highest-level path where more than one match:
147 ## >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
148 ## ('joe', 'password')
150 Use latest add_password() in case of conflict:
152 >>> mgr.find_user_password("b", "http://example.com/")
153 ('second', 'spam')
155 No special relationship between a.example.com and example.com:
157 >>> mgr.find_user_password("a", "http://example.com/")
158 ('1', 'a')
159 >>> mgr.find_user_password("a", "http://a.example.com/")
160 (None, None)
162 Ports:
164 >>> mgr.find_user_password("Some Realm", "c.example.com")
165 (None, None)
166 >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
167 ('3', 'c')
168 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
169 ('3', 'c')
170 >>> mgr.find_user_password("Some Realm", "d.example.com")
171 ('4', 'd')
172 >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
173 ('5', 'e')
176 pass
179 def test_password_manager_default_port(self):
181 >>> mgr = urllib2.HTTPPasswordMgr()
182 >>> add = mgr.add_password
184 The point to note here is that we can't guess the default port if there's
185 no scheme. This applies to both add_password and find_user_password.
187 >>> add("f", "http://g.example.com:80", "10", "j")
188 >>> add("g", "http://h.example.com", "11", "k")
189 >>> add("h", "i.example.com:80", "12", "l")
190 >>> add("i", "j.example.com", "13", "m")
191 >>> mgr.find_user_password("f", "g.example.com:100")
192 (None, None)
193 >>> mgr.find_user_password("f", "g.example.com:80")
194 ('10', 'j')
195 >>> mgr.find_user_password("f", "g.example.com")
196 (None, None)
197 >>> mgr.find_user_password("f", "http://g.example.com:100")
198 (None, None)
199 >>> mgr.find_user_password("f", "http://g.example.com:80")
200 ('10', 'j')
201 >>> mgr.find_user_password("f", "http://g.example.com")
202 ('10', 'j')
203 >>> mgr.find_user_password("g", "h.example.com")
204 ('11', 'k')
205 >>> mgr.find_user_password("g", "h.example.com:80")
206 ('11', 'k')
207 >>> mgr.find_user_password("g", "http://h.example.com:80")
208 ('11', 'k')
209 >>> mgr.find_user_password("h", "i.example.com")
210 (None, None)
211 >>> mgr.find_user_password("h", "i.example.com:80")
212 ('12', 'l')
213 >>> mgr.find_user_password("h", "http://i.example.com:80")
214 ('12', 'l')
215 >>> mgr.find_user_password("i", "j.example.com")
216 ('13', 'm')
217 >>> mgr.find_user_password("i", "j.example.com:80")
218 (None, None)
219 >>> mgr.find_user_password("i", "http://j.example.com")
220 ('13', 'm')
221 >>> mgr.find_user_password("i", "http://j.example.com:80")
222 (None, None)
226 class MockOpener:
227 addheaders = []
228 def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
229 self.req, self.data, self.timeout = req, data, timeout
230 def error(self, proto, *args):
231 self.proto, self.args = proto, args
233 class MockFile:
234 def read(self, count=None): pass
235 def readline(self, count=None): pass
236 def close(self): pass
238 class MockHeaders(dict):
239 def getheaders(self, name):
240 return self.values()
242 class MockResponse(StringIO.StringIO):
243 def __init__(self, code, msg, headers, data, url=None):
244 StringIO.StringIO.__init__(self, data)
245 self.code, self.msg, self.headers, self.url = code, msg, headers, url
246 def info(self):
247 return self.headers
248 def geturl(self):
249 return self.url
251 class MockCookieJar:
252 def add_cookie_header(self, request):
253 self.ach_req = request
254 def extract_cookies(self, response, request):
255 self.ec_req, self.ec_r = request, response
257 class FakeMethod:
258 def __init__(self, meth_name, action, handle):
259 self.meth_name = meth_name
260 self.handle = handle
261 self.action = action
262 def __call__(self, *args):
263 return self.handle(self.meth_name, self.action, *args)
265 class MockHTTPResponse:
266 def __init__(self, fp, msg, status, reason):
267 self.fp = fp
268 self.msg = msg
269 self.status = status
270 self.reason = reason
271 def read(self):
272 return ''
274 class MockHTTPClass:
275 def __init__(self):
276 self.req_headers = []
277 self.data = None
278 self.raise_on_endheaders = False
279 self._tunnel_headers = {}
281 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
282 self.host = host
283 self.timeout = timeout
284 return self
286 def set_debuglevel(self, level):
287 self.level = level
289 def set_tunnel(self, host, port=None, headers=None):
290 self._tunnel_host = host
291 self._tunnel_port = port
292 if headers:
293 self._tunnel_headers = headers
294 else:
295 self._tunnel_headers.clear()
296 def request(self, method, url, body=None, headers=None):
297 self.method = method
298 self.selector = url
299 if headers is not None:
300 self.req_headers += headers.items()
301 self.req_headers.sort()
302 if body:
303 self.data = body
304 if self.raise_on_endheaders:
305 import socket
306 raise socket.error()
307 def getresponse(self):
308 return MockHTTPResponse(MockFile(), {}, 200, "OK")
310 class MockHandler:
311 # useful for testing handler machinery
312 # see add_ordered_mock_handlers() docstring
313 handler_order = 500
314 def __init__(self, methods):
315 self._define_methods(methods)
316 def _define_methods(self, methods):
317 for spec in methods:
318 if len(spec) == 2: name, action = spec
319 else: name, action = spec, None
320 meth = FakeMethod(name, action, self.handle)
321 setattr(self.__class__, name, meth)
322 def handle(self, fn_name, action, *args, **kwds):
323 self.parent.calls.append((self, fn_name, args, kwds))
324 if action is None:
325 return None
326 elif action == "return self":
327 return self
328 elif action == "return response":
329 res = MockResponse(200, "OK", {}, "")
330 return res
331 elif action == "return request":
332 return Request("http://blah/")
333 elif action.startswith("error"):
334 code = action[action.rfind(" ")+1:]
335 try:
336 code = int(code)
337 except ValueError:
338 pass
339 res = MockResponse(200, "OK", {}, "")
340 return self.parent.error("http", args[0], res, code, "", {})
341 elif action == "raise":
342 raise urllib2.URLError("blah")
343 assert False
344 def close(self): pass
345 def add_parent(self, parent):
346 self.parent = parent
347 self.parent.calls = []
348 def __lt__(self, other):
349 if not hasattr(other, "handler_order"):
350 # No handler_order, leave in original order. Yuck.
351 return True
352 return self.handler_order < other.handler_order
354 def add_ordered_mock_handlers(opener, meth_spec):
355 """Create MockHandlers and add them to an OpenerDirector.
357 meth_spec: list of lists of tuples and strings defining methods to define
358 on handlers. eg:
360 [["http_error", "ftp_open"], ["http_open"]]
362 defines methods .http_error() and .ftp_open() on one handler, and
363 .http_open() on another. These methods just record their arguments and
364 return None. Using a tuple instead of a string causes the method to
365 perform some action (see MockHandler.handle()), eg:
367 [["http_error"], [("http_open", "return request")]]
369 defines .http_error() on one handler (which simply returns None), and
370 .http_open() on another handler, which returns a Request object.
373 handlers = []
374 count = 0
375 for meths in meth_spec:
376 class MockHandlerSubclass(MockHandler): pass
377 h = MockHandlerSubclass(meths)
378 h.handler_order += count
379 h.add_parent(opener)
380 count = count + 1
381 handlers.append(h)
382 opener.add_handler(h)
383 return handlers
385 def build_test_opener(*handler_instances):
386 opener = OpenerDirector()
387 for h in handler_instances:
388 opener.add_handler(h)
389 return opener
391 class MockHTTPHandler(urllib2.BaseHandler):
392 # useful for testing redirections and auth
393 # sends supplied headers and code as first response
394 # sends 200 OK as second response
395 def __init__(self, code, headers):
396 self.code = code
397 self.headers = headers
398 self.reset()
399 def reset(self):
400 self._count = 0
401 self.requests = []
402 def http_open(self, req):
403 import mimetools, httplib, copy
404 from StringIO import StringIO
405 self.requests.append(copy.deepcopy(req))
406 if self._count == 0:
407 self._count = self._count + 1
408 name = httplib.responses[self.code]
409 msg = mimetools.Message(StringIO(self.headers))
410 return self.parent.error(
411 "http", req, MockFile(), self.code, name, msg)
412 else:
413 self.req = req
414 msg = mimetools.Message(StringIO("\r\n\r\n"))
415 return MockResponse(200, "OK", msg, "", req.get_full_url())
417 class MockHTTPSHandler(urllib2.AbstractHTTPHandler):
418 # Useful for testing the Proxy-Authorization request by verifying the
419 # properties of httpcon
421 def __init__(self):
422 urllib2.AbstractHTTPHandler.__init__(self)
423 self.httpconn = MockHTTPClass()
425 def https_open(self, req):
426 return self.do_open(self.httpconn, req)
428 class MockPasswordManager:
429 def add_password(self, realm, uri, user, password):
430 self.realm = realm
431 self.url = uri
432 self.user = user
433 self.password = password
434 def find_user_password(self, realm, authuri):
435 self.target_realm = realm
436 self.target_url = authuri
437 return self.user, self.password
440 class OpenerDirectorTests(unittest.TestCase):
442 def test_add_non_handler(self):
443 class NonHandler(object):
444 pass
445 self.assertRaises(TypeError,
446 OpenerDirector().add_handler, NonHandler())
448 def test_badly_named_methods(self):
449 # test work-around for three methods that accidentally follow the
450 # naming conventions for handler methods
451 # (*_open() / *_request() / *_response())
453 # These used to call the accidentally-named methods, causing a
454 # TypeError in real code; here, returning self from these mock
455 # methods would either cause no exception, or AttributeError.
457 from urllib2 import URLError
459 o = OpenerDirector()
460 meth_spec = [
461 [("do_open", "return self"), ("proxy_open", "return self")],
462 [("redirect_request", "return self")],
464 handlers = add_ordered_mock_handlers(o, meth_spec)
465 o.add_handler(urllib2.UnknownHandler())
466 for scheme in "do", "proxy", "redirect":
467 self.assertRaises(URLError, o.open, scheme+"://example.com/")
469 def test_handled(self):
470 # handler returning non-None means no more handlers will be called
471 o = OpenerDirector()
472 meth_spec = [
473 ["http_open", "ftp_open", "http_error_302"],
474 ["ftp_open"],
475 [("http_open", "return self")],
476 [("http_open", "return self")],
478 handlers = add_ordered_mock_handlers(o, meth_spec)
480 req = Request("http://example.com/")
481 r = o.open(req)
482 # Second .http_open() gets called, third doesn't, since second returned
483 # non-None. Handlers without .http_open() never get any methods called
484 # on them.
485 # In fact, second mock handler defining .http_open() returns self
486 # (instead of response), which becomes the OpenerDirector's return
487 # value.
488 self.assertEqual(r, handlers[2])
489 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
490 for expected, got in zip(calls, o.calls):
491 handler, name, args, kwds = got
492 self.assertEqual((handler, name), expected)
493 self.assertEqual(args, (req,))
495 def test_handler_order(self):
496 o = OpenerDirector()
497 handlers = []
498 for meths, handler_order in [
499 ([("http_open", "return self")], 500),
500 (["http_open"], 0),
502 class MockHandlerSubclass(MockHandler): pass
503 h = MockHandlerSubclass(meths)
504 h.handler_order = handler_order
505 handlers.append(h)
506 o.add_handler(h)
508 r = o.open("http://example.com/")
509 # handlers called in reverse order, thanks to their sort order
510 self.assertEqual(o.calls[0][0], handlers[1])
511 self.assertEqual(o.calls[1][0], handlers[0])
513 def test_raise(self):
514 # raising URLError stops processing of request
515 o = OpenerDirector()
516 meth_spec = [
517 [("http_open", "raise")],
518 [("http_open", "return self")],
520 handlers = add_ordered_mock_handlers(o, meth_spec)
522 req = Request("http://example.com/")
523 self.assertRaises(urllib2.URLError, o.open, req)
524 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
526 ## def test_error(self):
527 ## # XXX this doesn't actually seem to be used in standard library,
528 ## # but should really be tested anyway...
530 def test_http_error(self):
531 # XXX http_error_default
532 # http errors are a special case
533 o = OpenerDirector()
534 meth_spec = [
535 [("http_open", "error 302")],
536 [("http_error_400", "raise"), "http_open"],
537 [("http_error_302", "return response"), "http_error_303",
538 "http_error"],
539 [("http_error_302")],
541 handlers = add_ordered_mock_handlers(o, meth_spec)
543 class Unknown:
544 def __eq__(self, other): return True
546 req = Request("http://example.com/")
547 r = o.open(req)
548 assert len(o.calls) == 2
549 calls = [(handlers[0], "http_open", (req,)),
550 (handlers[2], "http_error_302",
551 (req, Unknown(), 302, "", {}))]
552 for expected, got in zip(calls, o.calls):
553 handler, method_name, args = expected
554 self.assertEqual((handler, method_name), got[:2])
555 self.assertEqual(args, got[2])
557 def test_processors(self):
558 # *_request / *_response methods get called appropriately
559 o = OpenerDirector()
560 meth_spec = [
561 [("http_request", "return request"),
562 ("http_response", "return response")],
563 [("http_request", "return request"),
564 ("http_response", "return response")],
566 handlers = add_ordered_mock_handlers(o, meth_spec)
568 req = Request("http://example.com/")
569 r = o.open(req)
570 # processor methods are called on *all* handlers that define them,
571 # not just the first handler that handles the request
572 calls = [
573 (handlers[0], "http_request"), (handlers[1], "http_request"),
574 (handlers[0], "http_response"), (handlers[1], "http_response")]
576 for i, (handler, name, args, kwds) in enumerate(o.calls):
577 if i < 2:
578 # *_request
579 self.assertEqual((handler, name), calls[i])
580 self.assertEqual(len(args), 1)
581 self.assertIsInstance(args[0], Request)
582 else:
583 # *_response
584 self.assertEqual((handler, name), calls[i])
585 self.assertEqual(len(args), 2)
586 self.assertIsInstance(args[0], Request)
587 # response from opener.open is None, because there's no
588 # handler that defines http_open to handle it
589 self.assertTrue(args[1] is None or
590 isinstance(args[1], MockResponse))
593 def sanepathname2url(path):
594 import urllib
595 urlpath = urllib.pathname2url(path)
596 if os.name == "nt" and urlpath.startswith("///"):
597 urlpath = urlpath[2:]
598 # XXX don't ask me about the mac...
599 return urlpath
601 class HandlerTests(unittest.TestCase):
603 def test_ftp(self):
604 class MockFTPWrapper:
605 def __init__(self, data): self.data = data
606 def retrfile(self, filename, filetype):
607 self.filename, self.filetype = filename, filetype
608 return StringIO.StringIO(self.data), len(self.data)
610 class NullFTPHandler(urllib2.FTPHandler):
611 def __init__(self, data): self.data = data
612 def connect_ftp(self, user, passwd, host, port, dirs,
613 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
614 self.user, self.passwd = user, passwd
615 self.host, self.port = host, port
616 self.dirs = dirs
617 self.ftpwrapper = MockFTPWrapper(self.data)
618 return self.ftpwrapper
620 import ftplib
621 data = "rheum rhaponicum"
622 h = NullFTPHandler(data)
623 o = h.parent = MockOpener()
625 for url, host, port, type_, dirs, filename, mimetype in [
626 ("ftp://localhost/foo/bar/baz.html",
627 "localhost", ftplib.FTP_PORT, "I",
628 ["foo", "bar"], "baz.html", "text/html"),
629 ("ftp://localhost:80/foo/bar/",
630 "localhost", 80, "D",
631 ["foo", "bar"], "", None),
632 ("ftp://localhost/baz.gif;type=a",
633 "localhost", ftplib.FTP_PORT, "A",
634 [], "baz.gif", None), # XXX really this should guess image/gif
636 req = Request(url)
637 req.timeout = None
638 r = h.ftp_open(req)
639 # ftp authentication not yet implemented by FTPHandler
640 self.assertTrue(h.user == h.passwd == "")
641 self.assertEqual(h.host, socket.gethostbyname(host))
642 self.assertEqual(h.port, port)
643 self.assertEqual(h.dirs, dirs)
644 self.assertEqual(h.ftpwrapper.filename, filename)
645 self.assertEqual(h.ftpwrapper.filetype, type_)
646 headers = r.info()
647 self.assertEqual(headers.get("Content-type"), mimetype)
648 self.assertEqual(int(headers["Content-length"]), len(data))
650 def test_file(self):
651 import rfc822, socket
652 h = urllib2.FileHandler()
653 o = h.parent = MockOpener()
655 TESTFN = test_support.TESTFN
656 urlpath = sanepathname2url(os.path.abspath(TESTFN))
657 towrite = "hello, world\n"
658 urls = [
659 "file://localhost%s" % urlpath,
660 "file://%s" % urlpath,
661 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
663 try:
664 localaddr = socket.gethostbyname(socket.gethostname())
665 except socket.gaierror:
666 localaddr = ''
667 if localaddr:
668 urls.append("file://%s%s" % (localaddr, urlpath))
670 for url in urls:
671 f = open(TESTFN, "wb")
672 try:
673 try:
674 f.write(towrite)
675 finally:
676 f.close()
678 r = h.file_open(Request(url))
679 try:
680 data = r.read()
681 headers = r.info()
682 respurl = r.geturl()
683 finally:
684 r.close()
685 stats = os.stat(TESTFN)
686 modified = rfc822.formatdate(stats.st_mtime)
687 finally:
688 os.remove(TESTFN)
689 self.assertEqual(data, towrite)
690 self.assertEqual(headers["Content-type"], "text/plain")
691 self.assertEqual(headers["Content-length"], "13")
692 self.assertEqual(headers["Last-modified"], modified)
693 self.assertEqual(respurl, url)
695 for url in [
696 "file://localhost:80%s" % urlpath,
697 "file:///file_does_not_exist.txt",
698 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
699 os.getcwd(), TESTFN),
700 "file://somerandomhost.ontheinternet.com%s/%s" %
701 (os.getcwd(), TESTFN),
703 try:
704 f = open(TESTFN, "wb")
705 try:
706 f.write(towrite)
707 finally:
708 f.close()
710 self.assertRaises(urllib2.URLError,
711 h.file_open, Request(url))
712 finally:
713 os.remove(TESTFN)
715 h = urllib2.FileHandler()
716 o = h.parent = MockOpener()
717 # XXXX why does // mean ftp (and /// mean not ftp!), and where
718 # is file: scheme specified? I think this is really a bug, and
719 # what was intended was to distinguish between URLs like:
720 # file:/blah.txt (a file)
721 # file://localhost/blah.txt (a file)
722 # file:///blah.txt (a file)
723 # file://ftp.example.com/blah.txt (an ftp URL)
724 for url, ftp in [
725 ("file://ftp.example.com//foo.txt", True),
726 ("file://ftp.example.com///foo.txt", False),
727 # XXXX bug: fails with OSError, should be URLError
728 ("file://ftp.example.com/foo.txt", False),
730 req = Request(url)
731 try:
732 h.file_open(req)
733 # XXXX remove OSError when bug fixed
734 except (urllib2.URLError, OSError):
735 self.assertTrue(not ftp)
736 else:
737 self.assertTrue(o.req is req)
738 self.assertEqual(req.type, "ftp")
740 def test_http(self):
742 h = urllib2.AbstractHTTPHandler()
743 o = h.parent = MockOpener()
745 url = "http://example.com/"
746 for method, data in [("GET", None), ("POST", "blah")]:
747 req = Request(url, data, {"Foo": "bar"})
748 req.timeout = None
749 req.add_unredirected_header("Spam", "eggs")
750 http = MockHTTPClass()
751 r = h.do_open(http, req)
753 # result attributes
754 r.read; r.readline # wrapped MockFile methods
755 r.info; r.geturl # addinfourl methods
756 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply()
757 hdrs = r.info()
758 hdrs.get; hdrs.has_key # r.info() gives dict from .getreply()
759 self.assertEqual(r.geturl(), url)
761 self.assertEqual(http.host, "example.com")
762 self.assertEqual(http.level, 0)
763 self.assertEqual(http.method, method)
764 self.assertEqual(http.selector, "/")
765 self.assertEqual(http.req_headers,
766 [("Connection", "close"),
767 ("Foo", "bar"), ("Spam", "eggs")])
768 self.assertEqual(http.data, data)
770 # check socket.error converted to URLError
771 http.raise_on_endheaders = True
772 self.assertRaises(urllib2.URLError, h.do_open, http, req)
774 # check adding of standard headers
775 o.addheaders = [("Spam", "eggs")]
776 for data in "", None: # POST, GET
777 req = Request("http://example.com/", data)
778 r = MockResponse(200, "OK", {}, "")
779 newreq = h.do_request_(req)
780 if data is None: # GET
781 self.assertNotIn("Content-length", req.unredirected_hdrs)
782 self.assertNotIn("Content-type", req.unredirected_hdrs)
783 else: # POST
784 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
785 self.assertEqual(req.unredirected_hdrs["Content-type"],
786 "application/x-www-form-urlencoded")
787 # XXX the details of Host could be better tested
788 self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
789 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
791 # don't clobber existing headers
792 req.add_unredirected_header("Content-length", "foo")
793 req.add_unredirected_header("Content-type", "bar")
794 req.add_unredirected_header("Host", "baz")
795 req.add_unredirected_header("Spam", "foo")
796 newreq = h.do_request_(req)
797 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
798 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
799 self.assertEqual(req.unredirected_hdrs["Host"], "baz")
800 self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
802 def test_http_doubleslash(self):
803 # Checks that the presence of an unnecessary double slash in a url doesn't break anything
804 # Previously, a double slash directly after the host could cause incorrect parsing of the url
805 h = urllib2.AbstractHTTPHandler()
806 o = h.parent = MockOpener()
808 data = ""
809 ds_urls = [
810 "http://example.com/foo/bar/baz.html",
811 "http://example.com//foo/bar/baz.html",
812 "http://example.com/foo//bar/baz.html",
813 "http://example.com/foo/bar//baz.html",
816 for ds_url in ds_urls:
817 ds_req = Request(ds_url, data)
819 # Check whether host is determined correctly if there is no proxy
820 np_ds_req = h.do_request_(ds_req)
821 self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
823 # Check whether host is determined correctly if there is a proxy
824 ds_req.set_proxy("someproxy:3128",None)
825 p_ds_req = h.do_request_(ds_req)
826 self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
828 def test_errors(self):
829 h = urllib2.HTTPErrorProcessor()
830 o = h.parent = MockOpener()
832 url = "http://example.com/"
833 req = Request(url)
834 # all 2xx are passed through
835 r = MockResponse(200, "OK", {}, "", url)
836 newr = h.http_response(req, r)
837 self.assertTrue(r is newr)
838 self.assertTrue(not hasattr(o, "proto")) # o.error not called
839 r = MockResponse(202, "Accepted", {}, "", url)
840 newr = h.http_response(req, r)
841 self.assertTrue(r is newr)
842 self.assertTrue(not hasattr(o, "proto")) # o.error not called
843 r = MockResponse(206, "Partial content", {}, "", url)
844 newr = h.http_response(req, r)
845 self.assertTrue(r is newr)
846 self.assertTrue(not hasattr(o, "proto")) # o.error not called
847 # anything else calls o.error (and MockOpener returns None, here)
848 r = MockResponse(502, "Bad gateway", {}, "", url)
849 self.assertTrue(h.http_response(req, r) is None)
850 self.assertEqual(o.proto, "http") # o.error called
851 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
853 def test_cookies(self):
854 cj = MockCookieJar()
855 h = urllib2.HTTPCookieProcessor(cj)
856 o = h.parent = MockOpener()
858 req = Request("http://example.com/")
859 r = MockResponse(200, "OK", {}, "")
860 newreq = h.http_request(req)
861 self.assertTrue(cj.ach_req is req is newreq)
862 self.assertEquals(req.get_origin_req_host(), "example.com")
863 self.assertTrue(not req.is_unverifiable())
864 newr = h.http_response(req, r)
865 self.assertTrue(cj.ec_req is req)
866 self.assertTrue(cj.ec_r is r is newr)
868 def test_redirect(self):
869 from_url = "http://example.com/a.html"
870 to_url = "http://example.com/b.html"
871 h = urllib2.HTTPRedirectHandler()
872 o = h.parent = MockOpener()
874 # ordinary redirect behaviour
875 for code in 301, 302, 303, 307:
876 for data in None, "blah\nblah\n":
877 method = getattr(h, "http_error_%s" % code)
878 req = Request(from_url, data)
879 req.add_header("Nonsense", "viking=withhold")
880 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
881 if data is not None:
882 req.add_header("Content-Length", str(len(data)))
883 req.add_unredirected_header("Spam", "spam")
884 try:
885 method(req, MockFile(), code, "Blah",
886 MockHeaders({"location": to_url}))
887 except urllib2.HTTPError:
888 # 307 in response to POST requires user OK
889 self.assertTrue(code == 307 and data is not None)
890 self.assertEqual(o.req.get_full_url(), to_url)
891 try:
892 self.assertEqual(o.req.get_method(), "GET")
893 except AttributeError:
894 self.assertTrue(not o.req.has_data())
896 # now it's a GET, there should not be headers regarding content
897 # (possibly dragged from before being a POST)
898 headers = [x.lower() for x in o.req.headers]
899 self.assertNotIn("content-length", headers)
900 self.assertNotIn("content-type", headers)
902 self.assertEqual(o.req.headers["Nonsense"],
903 "viking=withhold")
904 self.assertNotIn("Spam", o.req.headers)
905 self.assertNotIn("Spam", o.req.unredirected_hdrs)
907 # loop detection
908 req = Request(from_url)
909 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
910 def redirect(h, req, url=to_url):
911 h.http_error_302(req, MockFile(), 302, "Blah",
912 MockHeaders({"location": url}))
913 # Note that the *original* request shares the same record of
914 # redirections with the sub-requests caused by the redirections.
916 # detect infinite loop redirect of a URL to itself
917 req = Request(from_url, origin_req_host="example.com")
918 count = 0
919 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
920 try:
921 while 1:
922 redirect(h, req, "http://example.com/")
923 count = count + 1
924 except urllib2.HTTPError:
925 # don't stop until max_repeats, because cookies may introduce state
926 self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
928 # detect endless non-repeating chain of redirects
929 req = Request(from_url, origin_req_host="example.com")
930 count = 0
931 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
932 try:
933 while 1:
934 redirect(h, req, "http://example.com/%d" % count)
935 count = count + 1
936 except urllib2.HTTPError:
937 self.assertEqual(count,
938 urllib2.HTTPRedirectHandler.max_redirections)
940 def test_cookie_redirect(self):
941 # cookies shouldn't leak into redirected requests
942 from cookielib import CookieJar
944 from test.test_cookielib import interact_netscape
946 cj = CookieJar()
947 interact_netscape(cj, "http://www.example.com/", "spam=eggs")
948 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
949 hdeh = urllib2.HTTPDefaultErrorHandler()
950 hrh = urllib2.HTTPRedirectHandler()
951 cp = urllib2.HTTPCookieProcessor(cj)
952 o = build_test_opener(hh, hdeh, hrh, cp)
953 o.open("http://www.example.com/")
954 self.assertTrue(not hh.req.has_header("Cookie"))
956 def test_proxy(self):
957 o = OpenerDirector()
958 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
959 o.add_handler(ph)
960 meth_spec = [
961 [("http_open", "return response")]
963 handlers = add_ordered_mock_handlers(o, meth_spec)
965 req = Request("http://acme.example.com/")
966 self.assertEqual(req.get_host(), "acme.example.com")
967 r = o.open(req)
968 self.assertEqual(req.get_host(), "proxy.example.com:3128")
970 self.assertEqual([(handlers[0], "http_open")],
971 [tup[0:2] for tup in o.calls])
973 def test_proxy_no_proxy(self):
974 os.environ['no_proxy'] = 'python.org'
975 o = OpenerDirector()
976 ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))
977 o.add_handler(ph)
978 req = Request("http://www.perl.org/")
979 self.assertEqual(req.get_host(), "www.perl.org")
980 r = o.open(req)
981 self.assertEqual(req.get_host(), "proxy.example.com")
982 req = Request("http://www.python.org")
983 self.assertEqual(req.get_host(), "www.python.org")
984 r = o.open(req)
985 self.assertEqual(req.get_host(), "www.python.org")
986 del os.environ['no_proxy']
989 def test_proxy_https(self):
990 o = OpenerDirector()
991 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
992 o.add_handler(ph)
993 meth_spec = [
994 [("https_open","return response")]
996 handlers = add_ordered_mock_handlers(o, meth_spec)
997 req = Request("https://www.example.com/")
998 self.assertEqual(req.get_host(), "www.example.com")
999 r = o.open(req)
1000 self.assertEqual(req.get_host(), "proxy.example.com:3128")
1001 self.assertEqual([(handlers[0], "https_open")],
1002 [tup[0:2] for tup in o.calls])
1004 def test_proxy_https_proxy_authorization(self):
1005 o = OpenerDirector()
1006 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
1007 o.add_handler(ph)
1008 https_handler = MockHTTPSHandler()
1009 o.add_handler(https_handler)
1010 req = Request("https://www.example.com/")
1011 req.add_header("Proxy-Authorization","FooBar")
1012 req.add_header("User-Agent","Grail")
1013 self.assertEqual(req.get_host(), "www.example.com")
1014 self.assertIsNone(req._tunnel_host)
1015 r = o.open(req)
1016 # Verify Proxy-Authorization gets tunneled to request.
1017 # httpsconn req_headers do not have the Proxy-Authorization header but
1018 # the req will have.
1019 self.assertNotIn(("Proxy-Authorization","FooBar"),
1020 https_handler.httpconn.req_headers)
1021 self.assertIn(("User-Agent","Grail"),
1022 https_handler.httpconn.req_headers)
1023 self.assertIsNotNone(req._tunnel_host)
1024 self.assertEqual(req.get_host(), "proxy.example.com:3128")
1025 self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
1027 def test_basic_auth(self, quote_char='"'):
1028 opener = OpenerDirector()
1029 password_manager = MockPasswordManager()
1030 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
1031 realm = "ACME Widget Store"
1032 http_handler = MockHTTPHandler(
1033 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
1034 (quote_char, realm, quote_char) )
1035 opener.add_handler(auth_handler)
1036 opener.add_handler(http_handler)
1037 self._test_basic_auth(opener, auth_handler, "Authorization",
1038 realm, http_handler, password_manager,
1039 "http://acme.example.com/protected",
1040 "http://acme.example.com/protected",
1043 def test_basic_auth_with_single_quoted_realm(self):
1044 self.test_basic_auth(quote_char="'")
1046 def test_proxy_basic_auth(self):
1047 opener = OpenerDirector()
1048 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
1049 opener.add_handler(ph)
1050 password_manager = MockPasswordManager()
1051 auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
1052 realm = "ACME Networks"
1053 http_handler = MockHTTPHandler(
1054 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1055 opener.add_handler(auth_handler)
1056 opener.add_handler(http_handler)
1057 self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
1058 realm, http_handler, password_manager,
1059 "http://acme.example.com:3128/protected",
1060 "proxy.example.com:3128",
1063 def test_basic_and_digest_auth_handlers(self):
1064 # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
1065 # response (http://python.org/sf/1479302), where it should instead
1066 # return None to allow another handler (especially
1067 # HTTPBasicAuthHandler) to handle the response.
1069 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
1070 # try digest first (since it's the strongest auth scheme), so we record
1071 # order of calls here to check digest comes first:
1072 class RecordingOpenerDirector(OpenerDirector):
1073 def __init__(self):
1074 OpenerDirector.__init__(self)
1075 self.recorded = []
1076 def record(self, info):
1077 self.recorded.append(info)
1078 class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
1079 def http_error_401(self, *args, **kwds):
1080 self.parent.record("digest")
1081 urllib2.HTTPDigestAuthHandler.http_error_401(self,
1082 *args, **kwds)
1083 class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
1084 def http_error_401(self, *args, **kwds):
1085 self.parent.record("basic")
1086 urllib2.HTTPBasicAuthHandler.http_error_401(self,
1087 *args, **kwds)
1089 opener = RecordingOpenerDirector()
1090 password_manager = MockPasswordManager()
1091 digest_handler = TestDigestAuthHandler(password_manager)
1092 basic_handler = TestBasicAuthHandler(password_manager)
1093 realm = "ACME Networks"
1094 http_handler = MockHTTPHandler(
1095 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1096 opener.add_handler(basic_handler)
1097 opener.add_handler(digest_handler)
1098 opener.add_handler(http_handler)
1100 # check basic auth isn't blocked by digest handler failing
1101 self._test_basic_auth(opener, basic_handler, "Authorization",
1102 realm, http_handler, password_manager,
1103 "http://acme.example.com/protected",
1104 "http://acme.example.com/protected",
1106 # check digest was tried before basic (twice, because
1107 # _test_basic_auth called .open() twice)
1108 self.assertEqual(opener.recorded, ["digest", "basic"]*2)
1110 def _test_basic_auth(self, opener, auth_handler, auth_header,
1111 realm, http_handler, password_manager,
1112 request_url, protected_url):
1113 import base64
1114 user, password = "wile", "coyote"
1116 # .add_password() fed through to password manager
1117 auth_handler.add_password(realm, request_url, user, password)
1118 self.assertEqual(realm, password_manager.realm)
1119 self.assertEqual(request_url, password_manager.url)
1120 self.assertEqual(user, password_manager.user)
1121 self.assertEqual(password, password_manager.password)
1123 r = opener.open(request_url)
1125 # should have asked the password manager for the username/password
1126 self.assertEqual(password_manager.target_realm, realm)
1127 self.assertEqual(password_manager.target_url, protected_url)
1129 # expect one request without authorization, then one with
1130 self.assertEqual(len(http_handler.requests), 2)
1131 self.assertFalse(http_handler.requests[0].has_header(auth_header))
1132 userpass = '%s:%s' % (user, password)
1133 auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
1134 self.assertEqual(http_handler.requests[1].get_header(auth_header),
1135 auth_hdr_value)
1136 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
1137 auth_hdr_value)
1138 # if the password manager can't find a password, the handler won't
1139 # handle the HTTP auth error
1140 password_manager.user = password_manager.password = None
1141 http_handler.reset()
1142 r = opener.open(request_url)
1143 self.assertEqual(len(http_handler.requests), 1)
1144 self.assertFalse(http_handler.requests[0].has_header(auth_header))
1146 class MiscTests(unittest.TestCase):
1148 def test_build_opener(self):
1149 class MyHTTPHandler(urllib2.HTTPHandler): pass
1150 class FooHandler(urllib2.BaseHandler):
1151 def foo_open(self): pass
1152 class BarHandler(urllib2.BaseHandler):
1153 def bar_open(self): pass
1155 build_opener = urllib2.build_opener
1157 o = build_opener(FooHandler, BarHandler)
1158 self.opener_has_handler(o, FooHandler)
1159 self.opener_has_handler(o, BarHandler)
1161 # can take a mix of classes and instances
1162 o = build_opener(FooHandler, BarHandler())
1163 self.opener_has_handler(o, FooHandler)
1164 self.opener_has_handler(o, BarHandler)
1166 # subclasses of default handlers override default handlers
1167 o = build_opener(MyHTTPHandler)
1168 self.opener_has_handler(o, MyHTTPHandler)
1170 # a particular case of overriding: default handlers can be passed
1171 # in explicitly
1172 o = build_opener()
1173 self.opener_has_handler(o, urllib2.HTTPHandler)
1174 o = build_opener(urllib2.HTTPHandler)
1175 self.opener_has_handler(o, urllib2.HTTPHandler)
1176 o = build_opener(urllib2.HTTPHandler())
1177 self.opener_has_handler(o, urllib2.HTTPHandler)
1179 # Issue2670: multiple handlers sharing the same base class
1180 class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
1181 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
1182 self.opener_has_handler(o, MyHTTPHandler)
1183 self.opener_has_handler(o, MyOtherHTTPHandler)
1185 def opener_has_handler(self, opener, handler_class):
1186 for h in opener.handlers:
1187 if h.__class__ == handler_class:
1188 break
1189 else:
1190 self.assertTrue(False)
1192 class RequestTests(unittest.TestCase):
1194 def setUp(self):
1195 self.get = urllib2.Request("http://www.python.org/~jeremy/")
1196 self.post = urllib2.Request("http://www.python.org/~jeremy/",
1197 "data",
1198 headers={"X-Test": "test"})
1200 def test_method(self):
1201 self.assertEqual("POST", self.post.get_method())
1202 self.assertEqual("GET", self.get.get_method())
1204 def test_add_data(self):
1205 self.assertTrue(not self.get.has_data())
1206 self.assertEqual("GET", self.get.get_method())
1207 self.get.add_data("spam")
1208 self.assertTrue(self.get.has_data())
1209 self.assertEqual("POST", self.get.get_method())
1211 def test_get_full_url(self):
1212 self.assertEqual("http://www.python.org/~jeremy/",
1213 self.get.get_full_url())
1215 def test_selector(self):
1216 self.assertEqual("/~jeremy/", self.get.get_selector())
1217 req = urllib2.Request("http://www.python.org/")
1218 self.assertEqual("/", req.get_selector())
1220 def test_get_type(self):
1221 self.assertEqual("http", self.get.get_type())
1223 def test_get_host(self):
1224 self.assertEqual("www.python.org", self.get.get_host())
1226 def test_get_host_unquote(self):
1227 req = urllib2.Request("http://www.%70ython.org/")
1228 self.assertEqual("www.python.org", req.get_host())
1230 def test_proxy(self):
1231 self.assertTrue(not self.get.has_proxy())
1232 self.get.set_proxy("www.perl.org", "http")
1233 self.assertTrue(self.get.has_proxy())
1234 self.assertEqual("www.python.org", self.get.get_origin_req_host())
1235 self.assertEqual("www.perl.org", self.get.get_host())
1238 def test_main(verbose=None):
1239 from test import test_urllib2
1240 test_support.run_doctest(test_urllib2, verbose)
1241 test_support.run_doctest(urllib2, verbose)
1242 tests = (TrivialTests,
1243 OpenerDirectorTests,
1244 HandlerTests,
1245 MiscTests,
1246 RequestTests)
1247 test_support.run_unittest(*tests)
1249 if __name__ == "__main__":
1250 test_main(verbose=True)