2 from test
import test_support
9 from urllib2
import Request
, OpenerDirector
13 # CacheFTPHandler (hard to write)
14 # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
16 class TrivialTests(unittest
.TestCase
):
17 def test_trivial(self
):
18 # A couple trivial tests
20 self
.assertRaises(ValueError, urllib2
.urlopen
, 'bogus url')
22 # XXX Name hacking to get this to work on Windows.
23 fname
= os
.path
.abspath(urllib2
.__file__
).replace('\\', '/')
26 # And more hacking to get it to work on MacOS. This assumes
27 # urllib.pathname2url works, unfortunately...
29 fname
= '/' + fname
.replace(':', '/')
30 elif os
.name
== 'riscos':
32 fname
= os
.expand(fname
)
33 fname
= fname
.translate(string
.maketrans("/.", "./"))
35 file_url
= "file://%s" % fname
36 f
= urllib2
.urlopen(file_url
)
41 def test_parse_http_list(self
):
42 tests
= [('a,b,c', ['a', 'b', 'c']),
43 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
44 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
45 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
46 for string
, list in tests
:
47 self
.assertEquals(urllib2
.parse_http_list(string
), list)
50 def test_request_headers_dict():
52 The Request.headers dictionary is not a documented interface. It should
53 stay that way, because the complete set of headers are only accessible
54 through the .get_header(), .has_header(), .header_items() interface.
55 However, .headers pre-dates those methods, and so real code will be using
58 The introduction in 2.4 of those methods was a mistake for the same reason:
59 code that previously saw all (urllib2 user)-provided headers in .headers
60 now sees only a subset (and the function interface is ugly and incomplete).
61 A better change would have been to replace .headers dict with a dict
62 subclass (or UserDict.DictMixin instance?) that preserved the .headers
63 interface and also provided access to the "unredirected" headers. It's
64 probably too late to fix that, though.
67 Check .capitalize() case normalization:
69 >>> url = "http://example.com"
70 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
72 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
75 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
76 but that could be changed in future.
80 def test_request_headers_methods():
82 Note the case normalization of header names here, to .capitalize()-case.
83 This should be preserved for backwards-compatibility. (In the HTTP case,
84 normalization to .title()-case is done by urllib2 before sending headers to
87 >>> url = "http://example.com"
88 >>> r = Request(url, headers={"Spam-eggs": "blah"})
89 >>> r.has_header("Spam-eggs")
92 [('Spam-eggs', 'blah')]
93 >>> r.add_header("Foo-Bar", "baz")
94 >>> items = r.header_items()
97 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
99 Note that e.g. r.has_header("spam-EggS") is currently False, and
100 r.get_header("spam-EggS") returns None, but that could be changed in
103 >>> r.has_header("Not-there")
105 >>> print r.get_header("Not-there")
107 >>> r.get_header("Not-there", "default")
113 def test_password_manager(self
):
115 >>> mgr = urllib2.HTTPPasswordMgr()
116 >>> add = mgr.add_password
117 >>> add("Some Realm", "http://example.com/", "joe", "password")
118 >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
119 >>> add("c", "http://example.com/foo", "foo", "ni")
120 >>> add("c", "http://example.com/bar", "bar", "nini")
121 >>> add("b", "http://example.com/", "first", "blah")
122 >>> add("b", "http://example.com/", "second", "spam")
123 >>> add("a", "http://example.com", "1", "a")
124 >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
125 >>> add("Some Realm", "d.example.com", "4", "d")
126 >>> add("Some Realm", "e.example.com:3128", "5", "e")
128 >>> mgr.find_user_password("Some Realm", "example.com")
130 >>> mgr.find_user_password("Some Realm", "http://example.com")
132 >>> mgr.find_user_password("Some Realm", "http://example.com/")
134 >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
136 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
138 >>> mgr.find_user_password("c", "http://example.com/foo")
140 >>> mgr.find_user_password("c", "http://example.com/bar")
143 Actually, this is really undefined ATM
144 ## Currently, we use the highest-level path where more than one match:
146 ## >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
147 ## ('joe', 'password')
149 Use latest add_password() in case of conflict:
151 >>> mgr.find_user_password("b", "http://example.com/")
154 No special relationship between a.example.com and example.com:
156 >>> mgr.find_user_password("a", "http://example.com/")
158 >>> mgr.find_user_password("a", "http://a.example.com/")
163 >>> mgr.find_user_password("Some Realm", "c.example.com")
165 >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
167 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
169 >>> mgr.find_user_password("Some Realm", "d.example.com")
171 >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
178 def test_password_manager_default_port(self
):
180 >>> mgr = urllib2.HTTPPasswordMgr()
181 >>> add = mgr.add_password
183 The point to note here is that we can't guess the default port if there's
184 no scheme. This applies to both add_password and find_user_password.
186 >>> add("f", "http://g.example.com:80", "10", "j")
187 >>> add("g", "http://h.example.com", "11", "k")
188 >>> add("h", "i.example.com:80", "12", "l")
189 >>> add("i", "j.example.com", "13", "m")
190 >>> mgr.find_user_password("f", "g.example.com:100")
192 >>> mgr.find_user_password("f", "g.example.com:80")
194 >>> mgr.find_user_password("f", "g.example.com")
196 >>> mgr.find_user_password("f", "http://g.example.com:100")
198 >>> mgr.find_user_password("f", "http://g.example.com:80")
200 >>> mgr.find_user_password("f", "http://g.example.com")
202 >>> mgr.find_user_password("g", "h.example.com")
204 >>> mgr.find_user_password("g", "h.example.com:80")
206 >>> mgr.find_user_password("g", "http://h.example.com:80")
208 >>> mgr.find_user_password("h", "i.example.com")
210 >>> mgr.find_user_password("h", "i.example.com:80")
212 >>> mgr.find_user_password("h", "http://i.example.com:80")
214 >>> mgr.find_user_password("i", "j.example.com")
216 >>> mgr.find_user_password("i", "j.example.com:80")
218 >>> mgr.find_user_password("i", "http://j.example.com")
220 >>> mgr.find_user_password("i", "http://j.example.com:80")
227 def open(self
, req
, data
=None):
228 self
.req
, self
.data
= req
, data
229 def error(self
, proto
, *args
):
230 self
.proto
, self
.args
= proto
, args
233 def read(self
, count
=None): pass
234 def readline(self
, count
=None): pass
235 def close(self
): pass
237 class MockHeaders(dict):
238 def getheaders(self
, name
):
241 class MockResponse(StringIO
.StringIO
):
242 def __init__(self
, code
, msg
, headers
, data
, url
=None):
243 StringIO
.StringIO
.__init
__(self
, data
)
244 self
.code
, self
.msg
, self
.headers
, self
.url
= code
, msg
, headers
, url
251 def add_cookie_header(self
, request
):
252 self
.ach_req
= request
253 def extract_cookies(self
, response
, request
):
254 self
.ec_req
, self
.ec_r
= request
, response
257 def __init__(self
, meth_name
, action
, handle
):
258 self
.meth_name
= meth_name
261 def __call__(self
, *args
):
262 return self
.handle(self
.meth_name
, self
.action
, *args
)
265 # useful for testing handler machinery
266 # see add_ordered_mock_handlers() docstring
268 def __init__(self
, methods
):
269 self
._define
_methods
(methods
)
270 def _define_methods(self
, methods
):
272 if len(spec
) == 2: name
, action
= spec
273 else: name
, action
= spec
, None
274 meth
= FakeMethod(name
, action
, self
.handle
)
275 setattr(self
.__class
__, name
, meth
)
276 def handle(self
, fn_name
, action
, *args
, **kwds
):
277 self
.parent
.calls
.append((self
, fn_name
, args
, kwds
))
280 elif action
== "return self":
282 elif action
== "return response":
283 res
= MockResponse(200, "OK", {}, "")
285 elif action
== "return request":
286 return Request("http://blah/")
287 elif action
.startswith("error"):
288 code
= action
[action
.rfind(" ")+1:]
293 res
= MockResponse(200, "OK", {}, "")
294 return self
.parent
.error("http", args
[0], res
, code
, "", {})
295 elif action
== "raise":
296 raise urllib2
.URLError("blah")
298 def close(self
): pass
299 def add_parent(self
, parent
):
301 self
.parent
.calls
= []
302 def __lt__(self
, other
):
303 if not hasattr(other
, "handler_order"):
304 # No handler_order, leave in original order. Yuck.
306 return self
.handler_order
< other
.handler_order
308 def add_ordered_mock_handlers(opener
, meth_spec
):
309 """Create MockHandlers and add them to an OpenerDirector.
311 meth_spec: list of lists of tuples and strings defining methods to define
314 [["http_error", "ftp_open"], ["http_open"]]
316 defines methods .http_error() and .ftp_open() on one handler, and
317 .http_open() on another. These methods just record their arguments and
318 return None. Using a tuple instead of a string causes the method to
319 perform some action (see MockHandler.handle()), eg:
321 [["http_error"], [("http_open", "return request")]]
323 defines .http_error() on one handler (which simply returns None), and
324 .http_open() on another handler, which returns a Request object.
329 for meths
in meth_spec
:
330 class MockHandlerSubclass(MockHandler
): pass
331 h
= MockHandlerSubclass(meths
)
332 h
.handler_order
+= count
336 opener
.add_handler(h
)
339 def build_test_opener(*handler_instances
):
340 opener
= OpenerDirector()
341 for h
in handler_instances
:
342 opener
.add_handler(h
)
345 class MockHTTPHandler(urllib2
.BaseHandler
):
346 # useful for testing redirections and auth
347 # sends supplied headers and code as first response
348 # sends 200 OK as second response
349 def __init__(self
, code
, headers
):
351 self
.headers
= headers
356 def http_open(self
, req
):
357 import mimetools
, httplib
, copy
358 from StringIO
import StringIO
359 self
.requests
.append(copy
.deepcopy(req
))
361 self
._count
= self
._count
+ 1
362 name
= httplib
.responses
[self
.code
]
363 msg
= mimetools
.Message(StringIO(self
.headers
))
364 return self
.parent
.error(
365 "http", req
, MockFile(), self
.code
, name
, msg
)
368 msg
= mimetools
.Message(StringIO("\r\n\r\n"))
369 return MockResponse(200, "OK", msg
, "", req
.get_full_url())
371 class MockPasswordManager
:
372 def add_password(self
, realm
, uri
, user
, password
):
376 self
.password
= password
377 def find_user_password(self
, realm
, authuri
):
378 self
.target_realm
= realm
379 self
.target_url
= authuri
380 return self
.user
, self
.password
383 class OpenerDirectorTests(unittest
.TestCase
):
385 def test_add_non_handler(self
):
386 class NonHandler(object):
388 self
.assertRaises(TypeError,
389 OpenerDirector().add_handler
, NonHandler())
391 def test_badly_named_methods(self
):
392 # test work-around for three methods that accidentally follow the
393 # naming conventions for handler methods
394 # (*_open() / *_request() / *_response())
396 # These used to call the accidentally-named methods, causing a
397 # TypeError in real code; here, returning self from these mock
398 # methods would either cause no exception, or AttributeError.
400 from urllib2
import URLError
404 [("do_open", "return self"), ("proxy_open", "return self")],
405 [("redirect_request", "return self")],
407 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
408 o
.add_handler(urllib2
.UnknownHandler())
409 for scheme
in "do", "proxy", "redirect":
410 self
.assertRaises(URLError
, o
.open, scheme
+"://example.com/")
412 def test_handled(self
):
413 # handler returning non-None means no more handlers will be called
416 ["http_open", "ftp_open", "http_error_302"],
418 [("http_open", "return self")],
419 [("http_open", "return self")],
421 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
423 req
= Request("http://example.com/")
425 # Second .http_open() gets called, third doesn't, since second returned
426 # non-None. Handlers without .http_open() never get any methods called
428 # In fact, second mock handler defining .http_open() returns self
429 # (instead of response), which becomes the OpenerDirector's return
431 self
.assertEqual(r
, handlers
[2])
432 calls
= [(handlers
[0], "http_open"), (handlers
[2], "http_open")]
433 for expected
, got
in zip(calls
, o
.calls
):
434 handler
, name
, args
, kwds
= got
435 self
.assertEqual((handler
, name
), expected
)
436 self
.assertEqual(args
, (req
,))
438 def test_handler_order(self
):
441 for meths
, handler_order
in [
442 ([("http_open", "return self")], 500),
445 class MockHandlerSubclass(MockHandler
): pass
446 h
= MockHandlerSubclass(meths
)
447 h
.handler_order
= handler_order
451 r
= o
.open("http://example.com/")
452 # handlers called in reverse order, thanks to their sort order
453 self
.assertEqual(o
.calls
[0][0], handlers
[1])
454 self
.assertEqual(o
.calls
[1][0], handlers
[0])
456 def test_raise(self
):
457 # raising URLError stops processing of request
460 [("http_open", "raise")],
461 [("http_open", "return self")],
463 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
465 req
= Request("http://example.com/")
466 self
.assertRaises(urllib2
.URLError
, o
.open, req
)
467 self
.assertEqual(o
.calls
, [(handlers
[0], "http_open", (req
,), {})])
469 ## def test_error(self):
470 ## # XXX this doesn't actually seem to be used in standard library,
471 ## # but should really be tested anyway...
473 def test_http_error(self
):
474 # XXX http_error_default
475 # http errors are a special case
478 [("http_open", "error 302")],
479 [("http_error_400", "raise"), "http_open"],
480 [("http_error_302", "return response"), "http_error_303",
482 [("http_error_302")],
484 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
487 def __eq__(self
, other
): return True
489 req
= Request("http://example.com/")
491 assert len(o
.calls
) == 2
492 calls
= [(handlers
[0], "http_open", (req
,)),
493 (handlers
[2], "http_error_302",
494 (req
, Unknown(), 302, "", {}))]
495 for expected
, got
in zip(calls
, o
.calls
):
496 handler
, method_name
, args
= expected
497 self
.assertEqual((handler
, method_name
), got
[:2])
498 self
.assertEqual(args
, got
[2])
500 def test_processors(self
):
501 # *_request / *_response methods get called appropriately
504 [("http_request", "return request"),
505 ("http_response", "return response")],
506 [("http_request", "return request"),
507 ("http_response", "return response")],
509 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
511 req
= Request("http://example.com/")
513 # processor methods are called on *all* handlers that define them,
514 # not just the first handler that handles the request
516 (handlers
[0], "http_request"), (handlers
[1], "http_request"),
517 (handlers
[0], "http_response"), (handlers
[1], "http_response")]
519 for i
, (handler
, name
, args
, kwds
) in enumerate(o
.calls
):
522 self
.assertEqual((handler
, name
), calls
[i
])
523 self
.assertEqual(len(args
), 1)
524 self
.assert_(isinstance(args
[0], Request
))
527 self
.assertEqual((handler
, name
), calls
[i
])
528 self
.assertEqual(len(args
), 2)
529 self
.assert_(isinstance(args
[0], Request
))
530 # response from opener.open is None, because there's no
531 # handler that defines http_open to handle it
532 self
.assert_(args
[1] is None or
533 isinstance(args
[1], MockResponse
))
536 def sanepathname2url(path
):
538 urlpath
= urllib
.pathname2url(path
)
539 if os
.name
== "nt" and urlpath
.startswith("///"):
540 urlpath
= urlpath
[2:]
541 # XXX don't ask me about the mac...
544 class HandlerTests(unittest
.TestCase
):
547 class MockFTPWrapper
:
548 def __init__(self
, data
): self
.data
= data
549 def retrfile(self
, filename
, filetype
):
550 self
.filename
, self
.filetype
= filename
, filetype
551 return StringIO
.StringIO(self
.data
), len(self
.data
)
553 class NullFTPHandler(urllib2
.FTPHandler
):
554 def __init__(self
, data
): self
.data
= data
555 def connect_ftp(self
, user
, passwd
, host
, port
, dirs
,
556 timeout
=socket
._GLOBAL
_DEFAULT
_TIMEOUT
):
557 self
.user
, self
.passwd
= user
, passwd
558 self
.host
, self
.port
= host
, port
560 self
.ftpwrapper
= MockFTPWrapper(self
.data
)
561 return self
.ftpwrapper
564 data
= "rheum rhaponicum"
565 h
= NullFTPHandler(data
)
566 o
= h
.parent
= MockOpener()
568 for url
, host
, port
, type_
, dirs
, filename
, mimetype
in [
569 ("ftp://localhost/foo/bar/baz.html",
570 "localhost", ftplib
.FTP_PORT
, "I",
571 ["foo", "bar"], "baz.html", "text/html"),
572 ("ftp://localhost:80/foo/bar/",
573 "localhost", 80, "D",
574 ["foo", "bar"], "", None),
575 ("ftp://localhost/baz.gif;type=a",
576 "localhost", ftplib
.FTP_PORT
, "A",
577 [], "baz.gif", None), # XXX really this should guess image/gif
582 # ftp authentication not yet implemented by FTPHandler
583 self
.assert_(h
.user
== h
.passwd
== "")
584 self
.assertEqual(h
.host
, socket
.gethostbyname(host
))
585 self
.assertEqual(h
.port
, port
)
586 self
.assertEqual(h
.dirs
, dirs
)
587 self
.assertEqual(h
.ftpwrapper
.filename
, filename
)
588 self
.assertEqual(h
.ftpwrapper
.filetype
, type_
)
590 self
.assertEqual(headers
.get("Content-type"), mimetype
)
591 self
.assertEqual(int(headers
["Content-length"]), len(data
))
594 import rfc822
, socket
595 h
= urllib2
.FileHandler()
596 o
= h
.parent
= MockOpener()
598 TESTFN
= test_support
.TESTFN
599 urlpath
= sanepathname2url(os
.path
.abspath(TESTFN
))
600 towrite
= "hello, world\n"
602 "file://localhost%s" % urlpath
,
603 "file://%s" % urlpath
,
604 "file://%s%s" % (socket
.gethostbyname('localhost'), urlpath
),
607 localaddr
= socket
.gethostbyname(socket
.gethostname())
608 except socket
.gaierror
:
611 urls
.append("file://%s%s" % (localaddr
, urlpath
))
614 f
= open(TESTFN
, "wb")
621 r
= h
.file_open(Request(url
))
628 stats
= os
.stat(TESTFN
)
629 modified
= rfc822
.formatdate(stats
.st_mtime
)
632 self
.assertEqual(data
, towrite
)
633 self
.assertEqual(headers
["Content-type"], "text/plain")
634 self
.assertEqual(headers
["Content-length"], "13")
635 self
.assertEqual(headers
["Last-modified"], modified
)
638 "file://localhost:80%s" % urlpath
,
639 "file:///file_does_not_exist.txt",
640 "file://%s:80%s/%s" % (socket
.gethostbyname('localhost'),
641 os
.getcwd(), TESTFN
),
642 "file://somerandomhost.ontheinternet.com%s/%s" %
643 (os
.getcwd(), TESTFN
),
646 f
= open(TESTFN
, "wb")
652 self
.assertRaises(urllib2
.URLError
,
653 h
.file_open
, Request(url
))
657 h
= urllib2
.FileHandler()
658 o
= h
.parent
= MockOpener()
659 # XXXX why does // mean ftp (and /// mean not ftp!), and where
660 # is file: scheme specified? I think this is really a bug, and
661 # what was intended was to distinguish between URLs like:
662 # file:/blah.txt (a file)
663 # file://localhost/blah.txt (a file)
664 # file:///blah.txt (a file)
665 # file://ftp.example.com/blah.txt (an ftp URL)
667 ("file://ftp.example.com//foo.txt", True),
668 ("file://ftp.example.com///foo.txt", False),
669 # XXXX bug: fails with OSError, should be URLError
670 ("file://ftp.example.com/foo.txt", False),
675 # XXXX remove OSError when bug fixed
676 except (urllib2
.URLError
, OSError):
677 self
.assert_(not ftp
)
679 self
.assert_(o
.req
is req
)
680 self
.assertEqual(req
.type, "ftp")
683 class MockHTTPResponse
:
684 def __init__(self
, fp
, msg
, status
, reason
):
693 self
.req_headers
= []
695 self
.raise_on_endheaders
= False
696 def __call__(self
, host
, timeout
=socket
._GLOBAL
_DEFAULT
_TIMEOUT
):
698 self
.timeout
= timeout
700 def set_debuglevel(self
, level
):
702 def request(self
, method
, url
, body
=None, headers
={}):
705 self
.req_headers
+= headers
.items()
706 self
.req_headers
.sort()
709 if self
.raise_on_endheaders
:
712 def getresponse(self
):
713 return MockHTTPResponse(MockFile(), {}, 200, "OK")
715 h
= urllib2
.AbstractHTTPHandler()
716 o
= h
.parent
= MockOpener()
718 url
= "http://example.com/"
719 for method
, data
in [("GET", None), ("POST", "blah")]:
720 req
= Request(url
, data
, {"Foo": "bar"})
722 req
.add_unredirected_header("Spam", "eggs")
723 http
= MockHTTPClass()
724 r
= h
.do_open(http
, req
)
727 r
.read
; r
.readline
# wrapped MockFile methods
728 r
.info
; r
.geturl
# addinfourl methods
729 r
.code
, r
.msg
== 200, "OK" # added from MockHTTPClass.getreply()
731 hdrs
.get
; hdrs
.has_key
# r.info() gives dict from .getreply()
732 self
.assertEqual(r
.geturl(), url
)
734 self
.assertEqual(http
.host
, "example.com")
735 self
.assertEqual(http
.level
, 0)
736 self
.assertEqual(http
.method
, method
)
737 self
.assertEqual(http
.selector
, "/")
738 self
.assertEqual(http
.req_headers
,
739 [("Connection", "close"),
740 ("Foo", "bar"), ("Spam", "eggs")])
741 self
.assertEqual(http
.data
, data
)
743 # check socket.error converted to URLError
744 http
.raise_on_endheaders
= True
745 self
.assertRaises(urllib2
.URLError
, h
.do_open
, http
, req
)
747 # check adding of standard headers
748 o
.addheaders
= [("Spam", "eggs")]
749 for data
in "", None: # POST, GET
750 req
= Request("http://example.com/", data
)
751 r
= MockResponse(200, "OK", {}, "")
752 newreq
= h
.do_request_(req
)
753 if data
is None: # GET
754 self
.assert_("Content-length" not in req
.unredirected_hdrs
)
755 self
.assert_("Content-type" not in req
.unredirected_hdrs
)
757 self
.assertEqual(req
.unredirected_hdrs
["Content-length"], "0")
758 self
.assertEqual(req
.unredirected_hdrs
["Content-type"],
759 "application/x-www-form-urlencoded")
760 # XXX the details of Host could be better tested
761 self
.assertEqual(req
.unredirected_hdrs
["Host"], "example.com")
762 self
.assertEqual(req
.unredirected_hdrs
["Spam"], "eggs")
764 # don't clobber existing headers
765 req
.add_unredirected_header("Content-length", "foo")
766 req
.add_unredirected_header("Content-type", "bar")
767 req
.add_unredirected_header("Host", "baz")
768 req
.add_unredirected_header("Spam", "foo")
769 newreq
= h
.do_request_(req
)
770 self
.assertEqual(req
.unredirected_hdrs
["Content-length"], "foo")
771 self
.assertEqual(req
.unredirected_hdrs
["Content-type"], "bar")
772 self
.assertEqual(req
.unredirected_hdrs
["Host"], "baz")
773 self
.assertEqual(req
.unredirected_hdrs
["Spam"], "foo")
775 def test_http_doubleslash(self
):
776 # Checks that the presence of an unnecessary double slash in a url doesn't break anything
777 # Previously, a double slash directly after the host could cause incorrect parsing of the url
778 h
= urllib2
.AbstractHTTPHandler()
779 o
= h
.parent
= MockOpener()
783 "http://example.com/foo/bar/baz.html",
784 "http://example.com//foo/bar/baz.html",
785 "http://example.com/foo//bar/baz.html",
786 "http://example.com/foo/bar//baz.html",
789 for ds_url
in ds_urls
:
790 ds_req
= Request(ds_url
, data
)
792 # Check whether host is determined correctly if there is no proxy
793 np_ds_req
= h
.do_request_(ds_req
)
794 self
.assertEqual(np_ds_req
.unredirected_hdrs
["Host"],"example.com")
796 # Check whether host is determined correctly if there is a proxy
797 ds_req
.set_proxy("someproxy:3128",None)
798 p_ds_req
= h
.do_request_(ds_req
)
799 self
.assertEqual(p_ds_req
.unredirected_hdrs
["Host"],"example.com")
801 def test_errors(self
):
802 h
= urllib2
.HTTPErrorProcessor()
803 o
= h
.parent
= MockOpener()
805 url
= "http://example.com/"
807 # all 2xx are passed through
808 r
= MockResponse(200, "OK", {}, "", url
)
809 newr
= h
.http_response(req
, r
)
810 self
.assert_(r
is newr
)
811 self
.assert_(not hasattr(o
, "proto")) # o.error not called
812 r
= MockResponse(202, "Accepted", {}, "", url
)
813 newr
= h
.http_response(req
, r
)
814 self
.assert_(r
is newr
)
815 self
.assert_(not hasattr(o
, "proto")) # o.error not called
816 r
= MockResponse(206, "Partial content", {}, "", url
)
817 newr
= h
.http_response(req
, r
)
818 self
.assert_(r
is newr
)
819 self
.assert_(not hasattr(o
, "proto")) # o.error not called
820 # anything else calls o.error (and MockOpener returns None, here)
821 r
= MockResponse(502, "Bad gateway", {}, "", url
)
822 self
.assert_(h
.http_response(req
, r
) is None)
823 self
.assertEqual(o
.proto
, "http") # o.error called
824 self
.assertEqual(o
.args
, (req
, r
, 502, "Bad gateway", {}))
826 def test_cookies(self
):
828 h
= urllib2
.HTTPCookieProcessor(cj
)
829 o
= h
.parent
= MockOpener()
831 req
= Request("http://example.com/")
832 r
= MockResponse(200, "OK", {}, "")
833 newreq
= h
.http_request(req
)
834 self
.assert_(cj
.ach_req
is req
is newreq
)
835 self
.assertEquals(req
.get_origin_req_host(), "example.com")
836 self
.assert_(not req
.is_unverifiable())
837 newr
= h
.http_response(req
, r
)
838 self
.assert_(cj
.ec_req
is req
)
839 self
.assert_(cj
.ec_r
is r
is newr
)
841 def test_redirect(self
):
842 from_url
= "http://example.com/a.html"
843 to_url
= "http://example.com/b.html"
844 h
= urllib2
.HTTPRedirectHandler()
845 o
= h
.parent
= MockOpener()
847 # ordinary redirect behaviour
848 for code
in 301, 302, 303, 307:
849 for data
in None, "blah\nblah\n":
850 method
= getattr(h
, "http_error_%s" % code
)
851 req
= Request(from_url
, data
)
852 req
.add_header("Nonsense", "viking=withhold")
854 req
.add_header("Content-Length", str(len(data
)))
855 req
.add_unredirected_header("Spam", "spam")
857 method(req
, MockFile(), code
, "Blah",
858 MockHeaders({"location": to_url
}))
859 except urllib2
.HTTPError
:
860 # 307 in response to POST requires user OK
861 self
.assert_(code
== 307 and data
is not None)
862 self
.assertEqual(o
.req
.get_full_url(), to_url
)
864 self
.assertEqual(o
.req
.get_method(), "GET")
865 except AttributeError:
866 self
.assert_(not o
.req
.has_data())
868 # now it's a GET, there should not be headers regarding content
869 # (possibly dragged from before being a POST)
870 headers
= [x
.lower() for x
in o
.req
.headers
]
871 self
.assertTrue("content-length" not in headers
)
872 self
.assertTrue("content-type" not in headers
)
874 self
.assertEqual(o
.req
.headers
["Nonsense"],
876 self
.assert_("Spam" not in o
.req
.headers
)
877 self
.assert_("Spam" not in o
.req
.unredirected_hdrs
)
880 req
= Request(from_url
)
881 def redirect(h
, req
, url
=to_url
):
882 h
.http_error_302(req
, MockFile(), 302, "Blah",
883 MockHeaders({"location": url
}))
884 # Note that the *original* request shares the same record of
885 # redirections with the sub-requests caused by the redirections.
887 # detect infinite loop redirect of a URL to itself
888 req
= Request(from_url
, origin_req_host
="example.com")
892 redirect(h
, req
, "http://example.com/")
894 except urllib2
.HTTPError
:
895 # don't stop until max_repeats, because cookies may introduce state
896 self
.assertEqual(count
, urllib2
.HTTPRedirectHandler
.max_repeats
)
898 # detect endless non-repeating chain of redirects
899 req
= Request(from_url
, origin_req_host
="example.com")
903 redirect(h
, req
, "http://example.com/%d" % count
)
905 except urllib2
.HTTPError
:
906 self
.assertEqual(count
,
907 urllib2
.HTTPRedirectHandler
.max_redirections
)
909 def test_cookie_redirect(self
):
910 # cookies shouldn't leak into redirected requests
911 from cookielib
import CookieJar
913 from test
.test_cookielib
import interact_netscape
916 interact_netscape(cj
, "http://www.example.com/", "spam=eggs")
917 hh
= MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
918 hdeh
= urllib2
.HTTPDefaultErrorHandler()
919 hrh
= urllib2
.HTTPRedirectHandler()
920 cp
= urllib2
.HTTPCookieProcessor(cj
)
921 o
= build_test_opener(hh
, hdeh
, hrh
, cp
)
922 o
.open("http://www.example.com/")
923 self
.assert_(not hh
.req
.has_header("Cookie"))
925 def test_proxy(self
):
927 ph
= urllib2
.ProxyHandler(dict(http
="proxy.example.com:3128"))
930 [("http_open", "return response")]
932 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
934 req
= Request("http://acme.example.com/")
935 self
.assertEqual(req
.get_host(), "acme.example.com")
937 self
.assertEqual(req
.get_host(), "proxy.example.com:3128")
939 self
.assertEqual([(handlers
[0], "http_open")],
940 [tup
[0:2] for tup
in o
.calls
])
942 def test_proxy_https(self
):
944 ph
= urllib2
.ProxyHandler(dict(https
='proxy.example.com:3128'))
947 [("https_open","return response")]
949 handlers
= add_ordered_mock_handlers(o
, meth_spec
)
950 req
= Request("https://www.example.com/")
951 self
.assertEqual(req
.get_host(), "www.example.com")
953 self
.assertEqual(req
.get_host(), "proxy.example.com:3128")
954 self
.assertEqual([(handlers
[0], "https_open")],
955 [tup
[0:2] for tup
in o
.calls
])
957 def test_basic_auth(self
, quote_char
='"'):
958 opener
= OpenerDirector()
959 password_manager
= MockPasswordManager()
960 auth_handler
= urllib2
.HTTPBasicAuthHandler(password_manager
)
961 realm
= "ACME Widget Store"
962 http_handler
= MockHTTPHandler(
963 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
964 (quote_char
, realm
, quote_char
) )
965 opener
.add_handler(auth_handler
)
966 opener
.add_handler(http_handler
)
967 self
._test
_basic
_auth
(opener
, auth_handler
, "Authorization",
968 realm
, http_handler
, password_manager
,
969 "http://acme.example.com/protected",
970 "http://acme.example.com/protected",
973 def test_basic_auth_with_single_quoted_realm(self
):
974 self
.test_basic_auth(quote_char
="'")
976 def test_proxy_basic_auth(self
):
977 opener
= OpenerDirector()
978 ph
= urllib2
.ProxyHandler(dict(http
="proxy.example.com:3128"))
979 opener
.add_handler(ph
)
980 password_manager
= MockPasswordManager()
981 auth_handler
= urllib2
.ProxyBasicAuthHandler(password_manager
)
982 realm
= "ACME Networks"
983 http_handler
= MockHTTPHandler(
984 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm
)
985 opener
.add_handler(auth_handler
)
986 opener
.add_handler(http_handler
)
987 self
._test
_basic
_auth
(opener
, auth_handler
, "Proxy-authorization",
988 realm
, http_handler
, password_manager
,
989 "http://acme.example.com:3128/protected",
990 "proxy.example.com:3128",
993 def test_basic_and_digest_auth_handlers(self
):
994 # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
995 # response (http://python.org/sf/1479302), where it should instead
996 # return None to allow another handler (especially
997 # HTTPBasicAuthHandler) to handle the response.
999 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
1000 # try digest first (since it's the strongest auth scheme), so we record
1001 # order of calls here to check digest comes first:
1002 class RecordingOpenerDirector(OpenerDirector
):
1004 OpenerDirector
.__init
__(self
)
1006 def record(self
, info
):
1007 self
.recorded
.append(info
)
1008 class TestDigestAuthHandler(urllib2
.HTTPDigestAuthHandler
):
1009 def http_error_401(self
, *args
, **kwds
):
1010 self
.parent
.record("digest")
1011 urllib2
.HTTPDigestAuthHandler
.http_error_401(self
,
1013 class TestBasicAuthHandler(urllib2
.HTTPBasicAuthHandler
):
1014 def http_error_401(self
, *args
, **kwds
):
1015 self
.parent
.record("basic")
1016 urllib2
.HTTPBasicAuthHandler
.http_error_401(self
,
1019 opener
= RecordingOpenerDirector()
1020 password_manager
= MockPasswordManager()
1021 digest_handler
= TestDigestAuthHandler(password_manager
)
1022 basic_handler
= TestBasicAuthHandler(password_manager
)
1023 realm
= "ACME Networks"
1024 http_handler
= MockHTTPHandler(
1025 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm
)
1026 opener
.add_handler(basic_handler
)
1027 opener
.add_handler(digest_handler
)
1028 opener
.add_handler(http_handler
)
1030 # check basic auth isn't blocked by digest handler failing
1031 self
._test
_basic
_auth
(opener
, basic_handler
, "Authorization",
1032 realm
, http_handler
, password_manager
,
1033 "http://acme.example.com/protected",
1034 "http://acme.example.com/protected",
1036 # check digest was tried before basic (twice, because
1037 # _test_basic_auth called .open() twice)
1038 self
.assertEqual(opener
.recorded
, ["digest", "basic"]*2)
1040 def _test_basic_auth(self
, opener
, auth_handler
, auth_header
,
1041 realm
, http_handler
, password_manager
,
1042 request_url
, protected_url
):
1044 user
, password
= "wile", "coyote"
1046 # .add_password() fed through to password manager
1047 auth_handler
.add_password(realm
, request_url
, user
, password
)
1048 self
.assertEqual(realm
, password_manager
.realm
)
1049 self
.assertEqual(request_url
, password_manager
.url
)
1050 self
.assertEqual(user
, password_manager
.user
)
1051 self
.assertEqual(password
, password_manager
.password
)
1053 r
= opener
.open(request_url
)
1055 # should have asked the password manager for the username/password
1056 self
.assertEqual(password_manager
.target_realm
, realm
)
1057 self
.assertEqual(password_manager
.target_url
, protected_url
)
1059 # expect one request without authorization, then one with
1060 self
.assertEqual(len(http_handler
.requests
), 2)
1061 self
.assertFalse(http_handler
.requests
[0].has_header(auth_header
))
1062 userpass
= '%s:%s' % (user
, password
)
1063 auth_hdr_value
= 'Basic '+base64
.encodestring(userpass
).strip()
1064 self
.assertEqual(http_handler
.requests
[1].get_header(auth_header
),
1067 # if the password manager can't find a password, the handler won't
1068 # handle the HTTP auth error
1069 password_manager
.user
= password_manager
.password
= None
1070 http_handler
.reset()
1071 r
= opener
.open(request_url
)
1072 self
.assertEqual(len(http_handler
.requests
), 1)
1073 self
.assertFalse(http_handler
.requests
[0].has_header(auth_header
))
1076 class MiscTests(unittest
.TestCase
):
1078 def test_build_opener(self
):
1079 class MyHTTPHandler(urllib2
.HTTPHandler
): pass
1080 class FooHandler(urllib2
.BaseHandler
):
1081 def foo_open(self
): pass
1082 class BarHandler(urllib2
.BaseHandler
):
1083 def bar_open(self
): pass
1085 build_opener
= urllib2
.build_opener
1087 o
= build_opener(FooHandler
, BarHandler
)
1088 self
.opener_has_handler(o
, FooHandler
)
1089 self
.opener_has_handler(o
, BarHandler
)
1091 # can take a mix of classes and instances
1092 o
= build_opener(FooHandler
, BarHandler())
1093 self
.opener_has_handler(o
, FooHandler
)
1094 self
.opener_has_handler(o
, BarHandler
)
1096 # subclasses of default handlers override default handlers
1097 o
= build_opener(MyHTTPHandler
)
1098 self
.opener_has_handler(o
, MyHTTPHandler
)
1100 # a particular case of overriding: default handlers can be passed
1103 self
.opener_has_handler(o
, urllib2
.HTTPHandler
)
1104 o
= build_opener(urllib2
.HTTPHandler
)
1105 self
.opener_has_handler(o
, urllib2
.HTTPHandler
)
1106 o
= build_opener(urllib2
.HTTPHandler())
1107 self
.opener_has_handler(o
, urllib2
.HTTPHandler
)
1109 # Issue2670: multiple handlers sharing the same base class
1110 class MyOtherHTTPHandler(urllib2
.HTTPHandler
): pass
1111 o
= build_opener(MyHTTPHandler
, MyOtherHTTPHandler
)
1112 self
.opener_has_handler(o
, MyHTTPHandler
)
1113 self
.opener_has_handler(o
, MyOtherHTTPHandler
)
1115 def opener_has_handler(self
, opener
, handler_class
):
1116 for h
in opener
.handlers
:
1117 if h
.__class
__ == handler_class
:
1122 class RequestTests(unittest
.TestCase
):
1125 self
.get
= urllib2
.Request("http://www.python.org/~jeremy/")
1126 self
.post
= urllib2
.Request("http://www.python.org/~jeremy/",
1128 headers
={"X-Test": "test"})
1130 def test_method(self
):
1131 self
.assertEqual("POST", self
.post
.get_method())
1132 self
.assertEqual("GET", self
.get
.get_method())
1134 def test_add_data(self
):
1135 self
.assert_(not self
.get
.has_data())
1136 self
.assertEqual("GET", self
.get
.get_method())
1137 self
.get
.add_data("spam")
1138 self
.assert_(self
.get
.has_data())
1139 self
.assertEqual("POST", self
.get
.get_method())
1141 def test_get_full_url(self
):
1142 self
.assertEqual("http://www.python.org/~jeremy/",
1143 self
.get
.get_full_url())
1145 def test_selector(self
):
1146 self
.assertEqual("/~jeremy/", self
.get
.get_selector())
1147 req
= urllib2
.Request("http://www.python.org/")
1148 self
.assertEqual("/", req
.get_selector())
1150 def test_get_type(self
):
1151 self
.assertEqual("http", self
.get
.get_type())
1153 def test_get_host(self
):
1154 self
.assertEqual("www.python.org", self
.get
.get_host())
1156 def test_get_host_unquote(self
):
1157 req
= urllib2
.Request("http://www.%70ython.org/")
1158 self
.assertEqual("www.python.org", req
.get_host())
1160 def test_proxy(self
):
1161 self
.assert_(not self
.get
.has_proxy())
1162 self
.get
.set_proxy("www.perl.org", "http")
1163 self
.assert_(self
.get
.has_proxy())
1164 self
.assertEqual("www.python.org", self
.get
.get_origin_req_host())
1165 self
.assertEqual("www.perl.org", self
.get
.get_host())
1168 def test_main(verbose
=None):
1169 from test
import test_urllib2
1170 test_support
.run_doctest(test_urllib2
, verbose
)
1171 test_support
.run_doctest(urllib2
, verbose
)
1172 tests
= (TrivialTests
,
1173 OpenerDirectorTests
,
1177 test_support
.run_unittest(*tests
)
1179 if __name__
== "__main__":
1180 test_main(verbose
=True)