Add NEWS entry as per RDM's suggestion (the bug was actually present
[python.git] / Lib / test / test_urllib2.py
blobe04d4a0412ec1a965409d0496ec5a6ced8ab73a6
1 import unittest
2 from test import test_support
4 import os
5 import socket
6 import StringIO
8 import urllib2
9 from urllib2 import Request, OpenerDirector
11 # XXX
12 # Request
13 # CacheFTPHandler (hard to write)
14 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
16 class TrivialTests(unittest.TestCase):
17 def test_trivial(self):
18 # A couple trivial tests
20 self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
22 # XXX Name hacking to get this to work on Windows.
23 fname = os.path.abspath(urllib2.__file__).replace('\\', '/')
24 if fname[1:2] == ":":
25 fname = fname[2:]
26 # And more hacking to get it to work on MacOS. This assumes
27 # urllib.pathname2url works, unfortunately...
28 if os.name == 'mac':
29 fname = '/' + fname.replace(':', '/')
30 elif os.name == 'riscos':
31 import string
32 fname = os.expand(fname)
33 fname = fname.translate(string.maketrans("/.", "./"))
35 file_url = "file://%s" % fname
36 f = urllib2.urlopen(file_url)
38 buf = f.read()
39 f.close()
41 def test_parse_http_list(self):
42 tests = [('a,b,c', ['a', 'b', 'c']),
43 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
44 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
45 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
46 for string, list in tests:
47 self.assertEquals(urllib2.parse_http_list(string), list)
50 def test_request_headers_dict():
51 """
52 The Request.headers dictionary is not a documented interface. It should
53 stay that way, because the complete set of headers are only accessible
54 through the .get_header(), .has_header(), .header_items() interface.
55 However, .headers pre-dates those methods, and so real code will be using
56 the dictionary.
58 The introduction in 2.4 of those methods was a mistake for the same reason:
59 code that previously saw all (urllib2 user)-provided headers in .headers
60 now sees only a subset (and the function interface is ugly and incomplete).
61 A better change would have been to replace .headers dict with a dict
62 subclass (or UserDict.DictMixin instance?) that preserved the .headers
63 interface and also provided access to the "unredirected" headers. It's
64 probably too late to fix that, though.
67 Check .capitalize() case normalization:
69 >>> url = "http://example.com"
70 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
71 'blah'
72 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
73 'blah'
75 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
76 but that could be changed in future.
78 """
80 def test_request_headers_methods():
81 """
82 Note the case normalization of header names here, to .capitalize()-case.
83 This should be preserved for backwards-compatibility. (In the HTTP case,
84 normalization to .title()-case is done by urllib2 before sending headers to
85 httplib).
87 >>> url = "http://example.com"
88 >>> r = Request(url, headers={"Spam-eggs": "blah"})
89 >>> r.has_header("Spam-eggs")
90 True
91 >>> r.header_items()
92 [('Spam-eggs', 'blah')]
93 >>> r.add_header("Foo-Bar", "baz")
94 >>> items = r.header_items()
95 >>> items.sort()
96 >>> items
97 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
99 Note that e.g. r.has_header("spam-EggS") is currently False, and
100 r.get_header("spam-EggS") returns None, but that could be changed in
101 future.
103 >>> r.has_header("Not-there")
104 False
105 >>> print r.get_header("Not-there")
106 None
107 >>> r.get_header("Not-there", "default")
108 'default'
113 def test_password_manager(self):
115 >>> mgr = urllib2.HTTPPasswordMgr()
116 >>> add = mgr.add_password
117 >>> add("Some Realm", "http://example.com/", "joe", "password")
118 >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
119 >>> add("c", "http://example.com/foo", "foo", "ni")
120 >>> add("c", "http://example.com/bar", "bar", "nini")
121 >>> add("b", "http://example.com/", "first", "blah")
122 >>> add("b", "http://example.com/", "second", "spam")
123 >>> add("a", "http://example.com", "1", "a")
124 >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
125 >>> add("Some Realm", "d.example.com", "4", "d")
126 >>> add("Some Realm", "e.example.com:3128", "5", "e")
128 >>> mgr.find_user_password("Some Realm", "example.com")
129 ('joe', 'password')
130 >>> mgr.find_user_password("Some Realm", "http://example.com")
131 ('joe', 'password')
132 >>> mgr.find_user_password("Some Realm", "http://example.com/")
133 ('joe', 'password')
134 >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
135 ('joe', 'password')
136 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
137 ('joe', 'password')
138 >>> mgr.find_user_password("c", "http://example.com/foo")
139 ('foo', 'ni')
140 >>> mgr.find_user_password("c", "http://example.com/bar")
141 ('bar', 'nini')
143 Actually, this is really undefined ATM
144 ## Currently, we use the highest-level path where more than one match:
146 ## >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
147 ## ('joe', 'password')
149 Use latest add_password() in case of conflict:
151 >>> mgr.find_user_password("b", "http://example.com/")
152 ('second', 'spam')
154 No special relationship between a.example.com and example.com:
156 >>> mgr.find_user_password("a", "http://example.com/")
157 ('1', 'a')
158 >>> mgr.find_user_password("a", "http://a.example.com/")
159 (None, None)
161 Ports:
163 >>> mgr.find_user_password("Some Realm", "c.example.com")
164 (None, None)
165 >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
166 ('3', 'c')
167 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
168 ('3', 'c')
169 >>> mgr.find_user_password("Some Realm", "d.example.com")
170 ('4', 'd')
171 >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
172 ('5', 'e')
175 pass
178 def test_password_manager_default_port(self):
180 >>> mgr = urllib2.HTTPPasswordMgr()
181 >>> add = mgr.add_password
183 The point to note here is that we can't guess the default port if there's
184 no scheme. This applies to both add_password and find_user_password.
186 >>> add("f", "http://g.example.com:80", "10", "j")
187 >>> add("g", "http://h.example.com", "11", "k")
188 >>> add("h", "i.example.com:80", "12", "l")
189 >>> add("i", "j.example.com", "13", "m")
190 >>> mgr.find_user_password("f", "g.example.com:100")
191 (None, None)
192 >>> mgr.find_user_password("f", "g.example.com:80")
193 ('10', 'j')
194 >>> mgr.find_user_password("f", "g.example.com")
195 (None, None)
196 >>> mgr.find_user_password("f", "http://g.example.com:100")
197 (None, None)
198 >>> mgr.find_user_password("f", "http://g.example.com:80")
199 ('10', 'j')
200 >>> mgr.find_user_password("f", "http://g.example.com")
201 ('10', 'j')
202 >>> mgr.find_user_password("g", "h.example.com")
203 ('11', 'k')
204 >>> mgr.find_user_password("g", "h.example.com:80")
205 ('11', 'k')
206 >>> mgr.find_user_password("g", "http://h.example.com:80")
207 ('11', 'k')
208 >>> mgr.find_user_password("h", "i.example.com")
209 (None, None)
210 >>> mgr.find_user_password("h", "i.example.com:80")
211 ('12', 'l')
212 >>> mgr.find_user_password("h", "http://i.example.com:80")
213 ('12', 'l')
214 >>> mgr.find_user_password("i", "j.example.com")
215 ('13', 'm')
216 >>> mgr.find_user_password("i", "j.example.com:80")
217 (None, None)
218 >>> mgr.find_user_password("i", "http://j.example.com")
219 ('13', 'm')
220 >>> mgr.find_user_password("i", "http://j.example.com:80")
221 (None, None)
225 class MockOpener:
226 addheaders = []
227 def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
228 self.req, self.data, self.timeout = req, data, timeout
229 def error(self, proto, *args):
230 self.proto, self.args = proto, args
232 class MockFile:
233 def read(self, count=None): pass
234 def readline(self, count=None): pass
235 def close(self): pass
237 class MockHeaders(dict):
238 def getheaders(self, name):
239 return self.values()
241 class MockResponse(StringIO.StringIO):
242 def __init__(self, code, msg, headers, data, url=None):
243 StringIO.StringIO.__init__(self, data)
244 self.code, self.msg, self.headers, self.url = code, msg, headers, url
245 def info(self):
246 return self.headers
247 def geturl(self):
248 return self.url
250 class MockCookieJar:
251 def add_cookie_header(self, request):
252 self.ach_req = request
253 def extract_cookies(self, response, request):
254 self.ec_req, self.ec_r = request, response
256 class FakeMethod:
257 def __init__(self, meth_name, action, handle):
258 self.meth_name = meth_name
259 self.handle = handle
260 self.action = action
261 def __call__(self, *args):
262 return self.handle(self.meth_name, self.action, *args)
264 class MockHandler:
265 # useful for testing handler machinery
266 # see add_ordered_mock_handlers() docstring
267 handler_order = 500
268 def __init__(self, methods):
269 self._define_methods(methods)
270 def _define_methods(self, methods):
271 for spec in methods:
272 if len(spec) == 2: name, action = spec
273 else: name, action = spec, None
274 meth = FakeMethod(name, action, self.handle)
275 setattr(self.__class__, name, meth)
276 def handle(self, fn_name, action, *args, **kwds):
277 self.parent.calls.append((self, fn_name, args, kwds))
278 if action is None:
279 return None
280 elif action == "return self":
281 return self
282 elif action == "return response":
283 res = MockResponse(200, "OK", {}, "")
284 return res
285 elif action == "return request":
286 return Request("http://blah/")
287 elif action.startswith("error"):
288 code = action[action.rfind(" ")+1:]
289 try:
290 code = int(code)
291 except ValueError:
292 pass
293 res = MockResponse(200, "OK", {}, "")
294 return self.parent.error("http", args[0], res, code, "", {})
295 elif action == "raise":
296 raise urllib2.URLError("blah")
297 assert False
298 def close(self): pass
299 def add_parent(self, parent):
300 self.parent = parent
301 self.parent.calls = []
302 def __lt__(self, other):
303 if not hasattr(other, "handler_order"):
304 # No handler_order, leave in original order. Yuck.
305 return True
306 return self.handler_order < other.handler_order
308 def add_ordered_mock_handlers(opener, meth_spec):
309 """Create MockHandlers and add them to an OpenerDirector.
311 meth_spec: list of lists of tuples and strings defining methods to define
312 on handlers. eg:
314 [["http_error", "ftp_open"], ["http_open"]]
316 defines methods .http_error() and .ftp_open() on one handler, and
317 .http_open() on another. These methods just record their arguments and
318 return None. Using a tuple instead of a string causes the method to
319 perform some action (see MockHandler.handle()), eg:
321 [["http_error"], [("http_open", "return request")]]
323 defines .http_error() on one handler (which simply returns None), and
324 .http_open() on another handler, which returns a Request object.
327 handlers = []
328 count = 0
329 for meths in meth_spec:
330 class MockHandlerSubclass(MockHandler): pass
331 h = MockHandlerSubclass(meths)
332 h.handler_order += count
333 h.add_parent(opener)
334 count = count + 1
335 handlers.append(h)
336 opener.add_handler(h)
337 return handlers
339 def build_test_opener(*handler_instances):
340 opener = OpenerDirector()
341 for h in handler_instances:
342 opener.add_handler(h)
343 return opener
345 class MockHTTPHandler(urllib2.BaseHandler):
346 # useful for testing redirections and auth
347 # sends supplied headers and code as first response
348 # sends 200 OK as second response
349 def __init__(self, code, headers):
350 self.code = code
351 self.headers = headers
352 self.reset()
353 def reset(self):
354 self._count = 0
355 self.requests = []
356 def http_open(self, req):
357 import mimetools, httplib, copy
358 from StringIO import StringIO
359 self.requests.append(copy.deepcopy(req))
360 if self._count == 0:
361 self._count = self._count + 1
362 name = httplib.responses[self.code]
363 msg = mimetools.Message(StringIO(self.headers))
364 return self.parent.error(
365 "http", req, MockFile(), self.code, name, msg)
366 else:
367 self.req = req
368 msg = mimetools.Message(StringIO("\r\n\r\n"))
369 return MockResponse(200, "OK", msg, "", req.get_full_url())
371 class MockPasswordManager:
372 def add_password(self, realm, uri, user, password):
373 self.realm = realm
374 self.url = uri
375 self.user = user
376 self.password = password
377 def find_user_password(self, realm, authuri):
378 self.target_realm = realm
379 self.target_url = authuri
380 return self.user, self.password
383 class OpenerDirectorTests(unittest.TestCase):
385 def test_add_non_handler(self):
386 class NonHandler(object):
387 pass
388 self.assertRaises(TypeError,
389 OpenerDirector().add_handler, NonHandler())
391 def test_badly_named_methods(self):
392 # test work-around for three methods that accidentally follow the
393 # naming conventions for handler methods
394 # (*_open() / *_request() / *_response())
396 # These used to call the accidentally-named methods, causing a
397 # TypeError in real code; here, returning self from these mock
398 # methods would either cause no exception, or AttributeError.
400 from urllib2 import URLError
402 o = OpenerDirector()
403 meth_spec = [
404 [("do_open", "return self"), ("proxy_open", "return self")],
405 [("redirect_request", "return self")],
407 handlers = add_ordered_mock_handlers(o, meth_spec)
408 o.add_handler(urllib2.UnknownHandler())
409 for scheme in "do", "proxy", "redirect":
410 self.assertRaises(URLError, o.open, scheme+"://example.com/")
412 def test_handled(self):
413 # handler returning non-None means no more handlers will be called
414 o = OpenerDirector()
415 meth_spec = [
416 ["http_open", "ftp_open", "http_error_302"],
417 ["ftp_open"],
418 [("http_open", "return self")],
419 [("http_open", "return self")],
421 handlers = add_ordered_mock_handlers(o, meth_spec)
423 req = Request("http://example.com/")
424 r = o.open(req)
425 # Second .http_open() gets called, third doesn't, since second returned
426 # non-None. Handlers without .http_open() never get any methods called
427 # on them.
428 # In fact, second mock handler defining .http_open() returns self
429 # (instead of response), which becomes the OpenerDirector's return
430 # value.
431 self.assertEqual(r, handlers[2])
432 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
433 for expected, got in zip(calls, o.calls):
434 handler, name, args, kwds = got
435 self.assertEqual((handler, name), expected)
436 self.assertEqual(args, (req,))
438 def test_handler_order(self):
439 o = OpenerDirector()
440 handlers = []
441 for meths, handler_order in [
442 ([("http_open", "return self")], 500),
443 (["http_open"], 0),
445 class MockHandlerSubclass(MockHandler): pass
446 h = MockHandlerSubclass(meths)
447 h.handler_order = handler_order
448 handlers.append(h)
449 o.add_handler(h)
451 r = o.open("http://example.com/")
452 # handlers called in reverse order, thanks to their sort order
453 self.assertEqual(o.calls[0][0], handlers[1])
454 self.assertEqual(o.calls[1][0], handlers[0])
456 def test_raise(self):
457 # raising URLError stops processing of request
458 o = OpenerDirector()
459 meth_spec = [
460 [("http_open", "raise")],
461 [("http_open", "return self")],
463 handlers = add_ordered_mock_handlers(o, meth_spec)
465 req = Request("http://example.com/")
466 self.assertRaises(urllib2.URLError, o.open, req)
467 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
469 ## def test_error(self):
470 ## # XXX this doesn't actually seem to be used in standard library,
471 ## # but should really be tested anyway...
473 def test_http_error(self):
474 # XXX http_error_default
475 # http errors are a special case
476 o = OpenerDirector()
477 meth_spec = [
478 [("http_open", "error 302")],
479 [("http_error_400", "raise"), "http_open"],
480 [("http_error_302", "return response"), "http_error_303",
481 "http_error"],
482 [("http_error_302")],
484 handlers = add_ordered_mock_handlers(o, meth_spec)
486 class Unknown:
487 def __eq__(self, other): return True
489 req = Request("http://example.com/")
490 r = o.open(req)
491 assert len(o.calls) == 2
492 calls = [(handlers[0], "http_open", (req,)),
493 (handlers[2], "http_error_302",
494 (req, Unknown(), 302, "", {}))]
495 for expected, got in zip(calls, o.calls):
496 handler, method_name, args = expected
497 self.assertEqual((handler, method_name), got[:2])
498 self.assertEqual(args, got[2])
500 def test_processors(self):
501 # *_request / *_response methods get called appropriately
502 o = OpenerDirector()
503 meth_spec = [
504 [("http_request", "return request"),
505 ("http_response", "return response")],
506 [("http_request", "return request"),
507 ("http_response", "return response")],
509 handlers = add_ordered_mock_handlers(o, meth_spec)
511 req = Request("http://example.com/")
512 r = o.open(req)
513 # processor methods are called on *all* handlers that define them,
514 # not just the first handler that handles the request
515 calls = [
516 (handlers[0], "http_request"), (handlers[1], "http_request"),
517 (handlers[0], "http_response"), (handlers[1], "http_response")]
519 for i, (handler, name, args, kwds) in enumerate(o.calls):
520 if i < 2:
521 # *_request
522 self.assertEqual((handler, name), calls[i])
523 self.assertEqual(len(args), 1)
524 self.assertTrue(isinstance(args[0], Request))
525 else:
526 # *_response
527 self.assertEqual((handler, name), calls[i])
528 self.assertEqual(len(args), 2)
529 self.assertTrue(isinstance(args[0], Request))
530 # response from opener.open is None, because there's no
531 # handler that defines http_open to handle it
532 self.assertTrue(args[1] is None or
533 isinstance(args[1], MockResponse))
536 def sanepathname2url(path):
537 import urllib
538 urlpath = urllib.pathname2url(path)
539 if os.name == "nt" and urlpath.startswith("///"):
540 urlpath = urlpath[2:]
541 # XXX don't ask me about the mac...
542 return urlpath
544 class HandlerTests(unittest.TestCase):
546 def test_ftp(self):
547 class MockFTPWrapper:
548 def __init__(self, data): self.data = data
549 def retrfile(self, filename, filetype):
550 self.filename, self.filetype = filename, filetype
551 return StringIO.StringIO(self.data), len(self.data)
553 class NullFTPHandler(urllib2.FTPHandler):
554 def __init__(self, data): self.data = data
555 def connect_ftp(self, user, passwd, host, port, dirs,
556 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
557 self.user, self.passwd = user, passwd
558 self.host, self.port = host, port
559 self.dirs = dirs
560 self.ftpwrapper = MockFTPWrapper(self.data)
561 return self.ftpwrapper
563 import ftplib
564 data = "rheum rhaponicum"
565 h = NullFTPHandler(data)
566 o = h.parent = MockOpener()
568 for url, host, port, type_, dirs, filename, mimetype in [
569 ("ftp://localhost/foo/bar/baz.html",
570 "localhost", ftplib.FTP_PORT, "I",
571 ["foo", "bar"], "baz.html", "text/html"),
572 ("ftp://localhost:80/foo/bar/",
573 "localhost", 80, "D",
574 ["foo", "bar"], "", None),
575 ("ftp://localhost/baz.gif;type=a",
576 "localhost", ftplib.FTP_PORT, "A",
577 [], "baz.gif", None), # XXX really this should guess image/gif
579 req = Request(url)
580 req.timeout = None
581 r = h.ftp_open(req)
582 # ftp authentication not yet implemented by FTPHandler
583 self.assertTrue(h.user == h.passwd == "")
584 self.assertEqual(h.host, socket.gethostbyname(host))
585 self.assertEqual(h.port, port)
586 self.assertEqual(h.dirs, dirs)
587 self.assertEqual(h.ftpwrapper.filename, filename)
588 self.assertEqual(h.ftpwrapper.filetype, type_)
589 headers = r.info()
590 self.assertEqual(headers.get("Content-type"), mimetype)
591 self.assertEqual(int(headers["Content-length"]), len(data))
593 def test_file(self):
594 import rfc822, socket
595 h = urllib2.FileHandler()
596 o = h.parent = MockOpener()
598 TESTFN = test_support.TESTFN
599 urlpath = sanepathname2url(os.path.abspath(TESTFN))
600 towrite = "hello, world\n"
601 urls = [
602 "file://localhost%s" % urlpath,
603 "file://%s" % urlpath,
604 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
606 try:
607 localaddr = socket.gethostbyname(socket.gethostname())
608 except socket.gaierror:
609 localaddr = ''
610 if localaddr:
611 urls.append("file://%s%s" % (localaddr, urlpath))
613 for url in urls:
614 f = open(TESTFN, "wb")
615 try:
616 try:
617 f.write(towrite)
618 finally:
619 f.close()
621 r = h.file_open(Request(url))
622 try:
623 data = r.read()
624 headers = r.info()
625 newurl = r.geturl()
626 finally:
627 r.close()
628 stats = os.stat(TESTFN)
629 modified = rfc822.formatdate(stats.st_mtime)
630 finally:
631 os.remove(TESTFN)
632 self.assertEqual(data, towrite)
633 self.assertEqual(headers["Content-type"], "text/plain")
634 self.assertEqual(headers["Content-length"], "13")
635 self.assertEqual(headers["Last-modified"], modified)
637 for url in [
638 "file://localhost:80%s" % urlpath,
639 "file:///file_does_not_exist.txt",
640 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
641 os.getcwd(), TESTFN),
642 "file://somerandomhost.ontheinternet.com%s/%s" %
643 (os.getcwd(), TESTFN),
645 try:
646 f = open(TESTFN, "wb")
647 try:
648 f.write(towrite)
649 finally:
650 f.close()
652 self.assertRaises(urllib2.URLError,
653 h.file_open, Request(url))
654 finally:
655 os.remove(TESTFN)
657 h = urllib2.FileHandler()
658 o = h.parent = MockOpener()
659 # XXXX why does // mean ftp (and /// mean not ftp!), and where
660 # is file: scheme specified? I think this is really a bug, and
661 # what was intended was to distinguish between URLs like:
662 # file:/blah.txt (a file)
663 # file://localhost/blah.txt (a file)
664 # file:///blah.txt (a file)
665 # file://ftp.example.com/blah.txt (an ftp URL)
666 for url, ftp in [
667 ("file://ftp.example.com//foo.txt", True),
668 ("file://ftp.example.com///foo.txt", False),
669 # XXXX bug: fails with OSError, should be URLError
670 ("file://ftp.example.com/foo.txt", False),
672 req = Request(url)
673 try:
674 h.file_open(req)
675 # XXXX remove OSError when bug fixed
676 except (urllib2.URLError, OSError):
677 self.assertTrue(not ftp)
678 else:
679 self.assertTrue(o.req is req)
680 self.assertEqual(req.type, "ftp")
682 def test_http(self):
683 class MockHTTPResponse:
684 def __init__(self, fp, msg, status, reason):
685 self.fp = fp
686 self.msg = msg
687 self.status = status
688 self.reason = reason
689 def read(self):
690 return ''
691 class MockHTTPClass:
692 def __init__(self):
693 self.req_headers = []
694 self.data = None
695 self.raise_on_endheaders = False
696 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
697 self.host = host
698 self.timeout = timeout
699 return self
700 def set_debuglevel(self, level):
701 self.level = level
702 def request(self, method, url, body=None, headers={}):
703 self.method = method
704 self.selector = url
705 self.req_headers += headers.items()
706 self.req_headers.sort()
707 if body:
708 self.data = body
709 if self.raise_on_endheaders:
710 import socket
711 raise socket.error()
712 def getresponse(self):
713 return MockHTTPResponse(MockFile(), {}, 200, "OK")
715 h = urllib2.AbstractHTTPHandler()
716 o = h.parent = MockOpener()
718 url = "http://example.com/"
719 for method, data in [("GET", None), ("POST", "blah")]:
720 req = Request(url, data, {"Foo": "bar"})
721 req.timeout = None
722 req.add_unredirected_header("Spam", "eggs")
723 http = MockHTTPClass()
724 r = h.do_open(http, req)
726 # result attributes
727 r.read; r.readline # wrapped MockFile methods
728 r.info; r.geturl # addinfourl methods
729 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply()
730 hdrs = r.info()
731 hdrs.get; hdrs.has_key # r.info() gives dict from .getreply()
732 self.assertEqual(r.geturl(), url)
734 self.assertEqual(http.host, "example.com")
735 self.assertEqual(http.level, 0)
736 self.assertEqual(http.method, method)
737 self.assertEqual(http.selector, "/")
738 self.assertEqual(http.req_headers,
739 [("Connection", "close"),
740 ("Foo", "bar"), ("Spam", "eggs")])
741 self.assertEqual(http.data, data)
743 # check socket.error converted to URLError
744 http.raise_on_endheaders = True
745 self.assertRaises(urllib2.URLError, h.do_open, http, req)
747 # check adding of standard headers
748 o.addheaders = [("Spam", "eggs")]
749 for data in "", None: # POST, GET
750 req = Request("http://example.com/", data)
751 r = MockResponse(200, "OK", {}, "")
752 newreq = h.do_request_(req)
753 if data is None: # GET
754 self.assertTrue("Content-length" not in req.unredirected_hdrs)
755 self.assertTrue("Content-type" not in req.unredirected_hdrs)
756 else: # POST
757 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
758 self.assertEqual(req.unredirected_hdrs["Content-type"],
759 "application/x-www-form-urlencoded")
760 # XXX the details of Host could be better tested
761 self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
762 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
764 # don't clobber existing headers
765 req.add_unredirected_header("Content-length", "foo")
766 req.add_unredirected_header("Content-type", "bar")
767 req.add_unredirected_header("Host", "baz")
768 req.add_unredirected_header("Spam", "foo")
769 newreq = h.do_request_(req)
770 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
771 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
772 self.assertEqual(req.unredirected_hdrs["Host"], "baz")
773 self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
775 def test_http_doubleslash(self):
776 # Checks that the presence of an unnecessary double slash in a url doesn't break anything
777 # Previously, a double slash directly after the host could cause incorrect parsing of the url
778 h = urllib2.AbstractHTTPHandler()
779 o = h.parent = MockOpener()
781 data = ""
782 ds_urls = [
783 "http://example.com/foo/bar/baz.html",
784 "http://example.com//foo/bar/baz.html",
785 "http://example.com/foo//bar/baz.html",
786 "http://example.com/foo/bar//baz.html",
789 for ds_url in ds_urls:
790 ds_req = Request(ds_url, data)
792 # Check whether host is determined correctly if there is no proxy
793 np_ds_req = h.do_request_(ds_req)
794 self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
796 # Check whether host is determined correctly if there is a proxy
797 ds_req.set_proxy("someproxy:3128",None)
798 p_ds_req = h.do_request_(ds_req)
799 self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
801 def test_errors(self):
802 h = urllib2.HTTPErrorProcessor()
803 o = h.parent = MockOpener()
805 url = "http://example.com/"
806 req = Request(url)
807 # all 2xx are passed through
808 r = MockResponse(200, "OK", {}, "", url)
809 newr = h.http_response(req, r)
810 self.assertTrue(r is newr)
811 self.assertTrue(not hasattr(o, "proto")) # o.error not called
812 r = MockResponse(202, "Accepted", {}, "", url)
813 newr = h.http_response(req, r)
814 self.assertTrue(r is newr)
815 self.assertTrue(not hasattr(o, "proto")) # o.error not called
816 r = MockResponse(206, "Partial content", {}, "", url)
817 newr = h.http_response(req, r)
818 self.assertTrue(r is newr)
819 self.assertTrue(not hasattr(o, "proto")) # o.error not called
820 # anything else calls o.error (and MockOpener returns None, here)
821 r = MockResponse(502, "Bad gateway", {}, "", url)
822 self.assertTrue(h.http_response(req, r) is None)
823 self.assertEqual(o.proto, "http") # o.error called
824 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
826 def test_cookies(self):
827 cj = MockCookieJar()
828 h = urllib2.HTTPCookieProcessor(cj)
829 o = h.parent = MockOpener()
831 req = Request("http://example.com/")
832 r = MockResponse(200, "OK", {}, "")
833 newreq = h.http_request(req)
834 self.assertTrue(cj.ach_req is req is newreq)
835 self.assertEquals(req.get_origin_req_host(), "example.com")
836 self.assertTrue(not req.is_unverifiable())
837 newr = h.http_response(req, r)
838 self.assertTrue(cj.ec_req is req)
839 self.assertTrue(cj.ec_r is r is newr)
841 def test_redirect(self):
842 from_url = "http://example.com/a.html"
843 to_url = "http://example.com/b.html"
844 h = urllib2.HTTPRedirectHandler()
845 o = h.parent = MockOpener()
847 # ordinary redirect behaviour
848 for code in 301, 302, 303, 307:
849 for data in None, "blah\nblah\n":
850 method = getattr(h, "http_error_%s" % code)
851 req = Request(from_url, data)
852 req.add_header("Nonsense", "viking=withhold")
853 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
854 if data is not None:
855 req.add_header("Content-Length", str(len(data)))
856 req.add_unredirected_header("Spam", "spam")
857 try:
858 method(req, MockFile(), code, "Blah",
859 MockHeaders({"location": to_url}))
860 except urllib2.HTTPError:
861 # 307 in response to POST requires user OK
862 self.assertTrue(code == 307 and data is not None)
863 self.assertEqual(o.req.get_full_url(), to_url)
864 try:
865 self.assertEqual(o.req.get_method(), "GET")
866 except AttributeError:
867 self.assertTrue(not o.req.has_data())
869 # now it's a GET, there should not be headers regarding content
870 # (possibly dragged from before being a POST)
871 headers = [x.lower() for x in o.req.headers]
872 self.assertTrue("content-length" not in headers)
873 self.assertTrue("content-type" not in headers)
875 self.assertEqual(o.req.headers["Nonsense"],
876 "viking=withhold")
877 self.assertTrue("Spam" not in o.req.headers)
878 self.assertTrue("Spam" not in o.req.unredirected_hdrs)
880 # loop detection
881 req = Request(from_url)
882 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
883 def redirect(h, req, url=to_url):
884 h.http_error_302(req, MockFile(), 302, "Blah",
885 MockHeaders({"location": url}))
886 # Note that the *original* request shares the same record of
887 # redirections with the sub-requests caused by the redirections.
889 # detect infinite loop redirect of a URL to itself
890 req = Request(from_url, origin_req_host="example.com")
891 count = 0
892 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
893 try:
894 while 1:
895 redirect(h, req, "http://example.com/")
896 count = count + 1
897 except urllib2.HTTPError:
898 # don't stop until max_repeats, because cookies may introduce state
899 self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
901 # detect endless non-repeating chain of redirects
902 req = Request(from_url, origin_req_host="example.com")
903 count = 0
904 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
905 try:
906 while 1:
907 redirect(h, req, "http://example.com/%d" % count)
908 count = count + 1
909 except urllib2.HTTPError:
910 self.assertEqual(count,
911 urllib2.HTTPRedirectHandler.max_redirections)
913 def test_cookie_redirect(self):
914 # cookies shouldn't leak into redirected requests
915 from cookielib import CookieJar
917 from test.test_cookielib import interact_netscape
919 cj = CookieJar()
920 interact_netscape(cj, "http://www.example.com/", "spam=eggs")
921 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
922 hdeh = urllib2.HTTPDefaultErrorHandler()
923 hrh = urllib2.HTTPRedirectHandler()
924 cp = urllib2.HTTPCookieProcessor(cj)
925 o = build_test_opener(hh, hdeh, hrh, cp)
926 o.open("http://www.example.com/")
927 self.assertTrue(not hh.req.has_header("Cookie"))
929 def test_proxy(self):
930 o = OpenerDirector()
931 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
932 o.add_handler(ph)
933 meth_spec = [
934 [("http_open", "return response")]
936 handlers = add_ordered_mock_handlers(o, meth_spec)
938 req = Request("http://acme.example.com/")
939 self.assertEqual(req.get_host(), "acme.example.com")
940 r = o.open(req)
941 self.assertEqual(req.get_host(), "proxy.example.com:3128")
943 self.assertEqual([(handlers[0], "http_open")],
944 [tup[0:2] for tup in o.calls])
946 def test_proxy_no_proxy(self):
947 os.environ['no_proxy'] = 'python.org'
948 o = OpenerDirector()
949 ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))
950 o.add_handler(ph)
951 req = Request("http://www.perl.org/")
952 self.assertEqual(req.get_host(), "www.perl.org")
953 r = o.open(req)
954 self.assertEqual(req.get_host(), "proxy.example.com")
955 req = Request("http://www.python.org")
956 self.assertEqual(req.get_host(), "www.python.org")
957 r = o.open(req)
958 self.assertEqual(req.get_host(), "www.python.org")
959 del os.environ['no_proxy']
962 def test_proxy_https(self):
963 o = OpenerDirector()
964 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
965 o.add_handler(ph)
966 meth_spec = [
967 [("https_open","return response")]
969 handlers = add_ordered_mock_handlers(o, meth_spec)
970 req = Request("https://www.example.com/")
971 self.assertEqual(req.get_host(), "www.example.com")
972 r = o.open(req)
973 self.assertEqual(req.get_host(), "proxy.example.com:3128")
974 self.assertEqual([(handlers[0], "https_open")],
975 [tup[0:2] for tup in o.calls])
977 def test_basic_auth(self, quote_char='"'):
978 opener = OpenerDirector()
979 password_manager = MockPasswordManager()
980 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
981 realm = "ACME Widget Store"
982 http_handler = MockHTTPHandler(
983 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
984 (quote_char, realm, quote_char) )
985 opener.add_handler(auth_handler)
986 opener.add_handler(http_handler)
987 self._test_basic_auth(opener, auth_handler, "Authorization",
988 realm, http_handler, password_manager,
989 "http://acme.example.com/protected",
990 "http://acme.example.com/protected",
993 def test_basic_auth_with_single_quoted_realm(self):
994 self.test_basic_auth(quote_char="'")
996 def test_proxy_basic_auth(self):
997 opener = OpenerDirector()
998 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
999 opener.add_handler(ph)
1000 password_manager = MockPasswordManager()
1001 auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
1002 realm = "ACME Networks"
1003 http_handler = MockHTTPHandler(
1004 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1005 opener.add_handler(auth_handler)
1006 opener.add_handler(http_handler)
1007 self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
1008 realm, http_handler, password_manager,
1009 "http://acme.example.com:3128/protected",
1010 "proxy.example.com:3128",
1013 def test_basic_and_digest_auth_handlers(self):
1014 # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
1015 # response (http://python.org/sf/1479302), where it should instead
1016 # return None to allow another handler (especially
1017 # HTTPBasicAuthHandler) to handle the response.
1019 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
1020 # try digest first (since it's the strongest auth scheme), so we record
1021 # order of calls here to check digest comes first:
1022 class RecordingOpenerDirector(OpenerDirector):
1023 def __init__(self):
1024 OpenerDirector.__init__(self)
1025 self.recorded = []
1026 def record(self, info):
1027 self.recorded.append(info)
1028 class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
1029 def http_error_401(self, *args, **kwds):
1030 self.parent.record("digest")
1031 urllib2.HTTPDigestAuthHandler.http_error_401(self,
1032 *args, **kwds)
1033 class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
1034 def http_error_401(self, *args, **kwds):
1035 self.parent.record("basic")
1036 urllib2.HTTPBasicAuthHandler.http_error_401(self,
1037 *args, **kwds)
1039 opener = RecordingOpenerDirector()
1040 password_manager = MockPasswordManager()
1041 digest_handler = TestDigestAuthHandler(password_manager)
1042 basic_handler = TestBasicAuthHandler(password_manager)
1043 realm = "ACME Networks"
1044 http_handler = MockHTTPHandler(
1045 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1046 opener.add_handler(basic_handler)
1047 opener.add_handler(digest_handler)
1048 opener.add_handler(http_handler)
1050 # check basic auth isn't blocked by digest handler failing
1051 self._test_basic_auth(opener, basic_handler, "Authorization",
1052 realm, http_handler, password_manager,
1053 "http://acme.example.com/protected",
1054 "http://acme.example.com/protected",
1056 # check digest was tried before basic (twice, because
1057 # _test_basic_auth called .open() twice)
1058 self.assertEqual(opener.recorded, ["digest", "basic"]*2)
1060 def _test_basic_auth(self, opener, auth_handler, auth_header,
1061 realm, http_handler, password_manager,
1062 request_url, protected_url):
1063 import base64
1064 user, password = "wile", "coyote"
1066 # .add_password() fed through to password manager
1067 auth_handler.add_password(realm, request_url, user, password)
1068 self.assertEqual(realm, password_manager.realm)
1069 self.assertEqual(request_url, password_manager.url)
1070 self.assertEqual(user, password_manager.user)
1071 self.assertEqual(password, password_manager.password)
1073 r = opener.open(request_url)
1075 # should have asked the password manager for the username/password
1076 self.assertEqual(password_manager.target_realm, realm)
1077 self.assertEqual(password_manager.target_url, protected_url)
1079 # expect one request without authorization, then one with
1080 self.assertEqual(len(http_handler.requests), 2)
1081 self.assertFalse(http_handler.requests[0].has_header(auth_header))
1082 userpass = '%s:%s' % (user, password)
1083 auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
1084 self.assertEqual(http_handler.requests[1].get_header(auth_header),
1085 auth_hdr_value)
1087 # if the password manager can't find a password, the handler won't
1088 # handle the HTTP auth error
1089 password_manager.user = password_manager.password = None
1090 http_handler.reset()
1091 r = opener.open(request_url)
1092 self.assertEqual(len(http_handler.requests), 1)
1093 self.assertFalse(http_handler.requests[0].has_header(auth_header))
1096 class MiscTests(unittest.TestCase):
1098 def test_build_opener(self):
1099 class MyHTTPHandler(urllib2.HTTPHandler): pass
1100 class FooHandler(urllib2.BaseHandler):
1101 def foo_open(self): pass
1102 class BarHandler(urllib2.BaseHandler):
1103 def bar_open(self): pass
1105 build_opener = urllib2.build_opener
1107 o = build_opener(FooHandler, BarHandler)
1108 self.opener_has_handler(o, FooHandler)
1109 self.opener_has_handler(o, BarHandler)
1111 # can take a mix of classes and instances
1112 o = build_opener(FooHandler, BarHandler())
1113 self.opener_has_handler(o, FooHandler)
1114 self.opener_has_handler(o, BarHandler)
1116 # subclasses of default handlers override default handlers
1117 o = build_opener(MyHTTPHandler)
1118 self.opener_has_handler(o, MyHTTPHandler)
1120 # a particular case of overriding: default handlers can be passed
1121 # in explicitly
1122 o = build_opener()
1123 self.opener_has_handler(o, urllib2.HTTPHandler)
1124 o = build_opener(urllib2.HTTPHandler)
1125 self.opener_has_handler(o, urllib2.HTTPHandler)
1126 o = build_opener(urllib2.HTTPHandler())
1127 self.opener_has_handler(o, urllib2.HTTPHandler)
1129 # Issue2670: multiple handlers sharing the same base class
1130 class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
1131 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
1132 self.opener_has_handler(o, MyHTTPHandler)
1133 self.opener_has_handler(o, MyOtherHTTPHandler)
1135 def opener_has_handler(self, opener, handler_class):
1136 for h in opener.handlers:
1137 if h.__class__ == handler_class:
1138 break
1139 else:
1140 self.assertTrue(False)
1142 class RequestTests(unittest.TestCase):
1144 def setUp(self):
1145 self.get = urllib2.Request("http://www.python.org/~jeremy/")
1146 self.post = urllib2.Request("http://www.python.org/~jeremy/",
1147 "data",
1148 headers={"X-Test": "test"})
1150 def test_method(self):
1151 self.assertEqual("POST", self.post.get_method())
1152 self.assertEqual("GET", self.get.get_method())
1154 def test_add_data(self):
1155 self.assertTrue(not self.get.has_data())
1156 self.assertEqual("GET", self.get.get_method())
1157 self.get.add_data("spam")
1158 self.assertTrue(self.get.has_data())
1159 self.assertEqual("POST", self.get.get_method())
1161 def test_get_full_url(self):
1162 self.assertEqual("http://www.python.org/~jeremy/",
1163 self.get.get_full_url())
1165 def test_selector(self):
1166 self.assertEqual("/~jeremy/", self.get.get_selector())
1167 req = urllib2.Request("http://www.python.org/")
1168 self.assertEqual("/", req.get_selector())
1170 def test_get_type(self):
1171 self.assertEqual("http", self.get.get_type())
1173 def test_get_host(self):
1174 self.assertEqual("www.python.org", self.get.get_host())
1176 def test_get_host_unquote(self):
1177 req = urllib2.Request("http://www.%70ython.org/")
1178 self.assertEqual("www.python.org", req.get_host())
1180 def test_proxy(self):
1181 self.assertTrue(not self.get.has_proxy())
1182 self.get.set_proxy("www.perl.org", "http")
1183 self.assertTrue(self.get.has_proxy())
1184 self.assertEqual("www.python.org", self.get.get_origin_req_host())
1185 self.assertEqual("www.perl.org", self.get.get_host())
1188 def test_main(verbose=None):
1189 from test import test_urllib2
1190 test_support.run_doctest(test_urllib2, verbose)
1191 test_support.run_doctest(urllib2, verbose)
1192 tests = (TrivialTests,
1193 OpenerDirectorTests,
1194 HandlerTests,
1195 MiscTests,
1196 RequestTests)
1197 test_support.run_unittest(*tests)
1199 if __name__ == "__main__":
1200 test_main(verbose=True)