Fullscreen support, UI fixes, reset improved
[smpy-maemo.git] / mechanize / _auth.py
blob9bb58730196cca21c341bccb93edf6bb4cbe15be
1 """HTTP Authentication and Proxy support.
3 All but HTTPProxyPasswordMgr come from Python 2.5.
6 Copyright 2006 John J. Lee <jjl@pobox.com>
8 This code is free software; you can redistribute it and/or modify it under
9 the terms of the BSD or ZPL 2.1 licenses (see the file COPYING.txt
10 included with the distribution).
12 """
14 import re, base64, urlparse, posixpath, md5, sha, sys, copy
16 from urllib2 import BaseHandler
17 from urllib import getproxies, unquote, splittype, splituser, splitpasswd, \
18 splitport
21 def _parse_proxy(proxy):
22 """Return (scheme, user, password, host/port) given a URL or an authority.
24 If a URL is supplied, it must have an authority (host:port) component.
25 According to RFC 3986, having an authority component means the URL must
26 have two slashes after the scheme:
28 >>> _parse_proxy('file:/ftp.example.com/')
29 Traceback (most recent call last):
30 ValueError: proxy URL with no authority: 'file:/ftp.example.com/'
32 The first three items of the returned tuple may be None.
34 Examples of authority parsing:
36 >>> _parse_proxy('proxy.example.com')
37 (None, None, None, 'proxy.example.com')
38 >>> _parse_proxy('proxy.example.com:3128')
39 (None, None, None, 'proxy.example.com:3128')
41 The authority component may optionally include userinfo (assumed to be
42 username:password):
44 >>> _parse_proxy('joe:password@proxy.example.com')
45 (None, 'joe', 'password', 'proxy.example.com')
46 >>> _parse_proxy('joe:password@proxy.example.com:3128')
47 (None, 'joe', 'password', 'proxy.example.com:3128')
49 Same examples, but with URLs instead:
51 >>> _parse_proxy('http://proxy.example.com/')
52 ('http', None, None, 'proxy.example.com')
53 >>> _parse_proxy('http://proxy.example.com:3128/')
54 ('http', None, None, 'proxy.example.com:3128')
55 >>> _parse_proxy('http://joe:password@proxy.example.com/')
56 ('http', 'joe', 'password', 'proxy.example.com')
57 >>> _parse_proxy('http://joe:password@proxy.example.com:3128')
58 ('http', 'joe', 'password', 'proxy.example.com:3128')
60 Everything after the authority is ignored:
62 >>> _parse_proxy('ftp://joe:password@proxy.example.com/rubbish:3128')
63 ('ftp', 'joe', 'password', 'proxy.example.com')
65 Test for no trailing '/' case:
67 >>> _parse_proxy('http://joe:password@proxy.example.com')
68 ('http', 'joe', 'password', 'proxy.example.com')
70 """
71 scheme, r_scheme = splittype(proxy)
72 if not r_scheme.startswith("/"):
73 # authority
74 scheme = None
75 authority = proxy
76 else:
77 # URL
78 if not r_scheme.startswith("//"):
79 raise ValueError("proxy URL with no authority: %r" % proxy)
80 # We have an authority, so for RFC 3986-compliant URLs (by ss 3.
81 # and 3.3.), path is empty or starts with '/'
82 end = r_scheme.find("/", 2)
83 if end == -1:
84 end = None
85 authority = r_scheme[2:end]
86 userinfo, hostport = splituser(authority)
87 if userinfo is not None:
88 user, password = splitpasswd(userinfo)
89 else:
90 user = password = None
91 return scheme, user, password, hostport
93 class ProxyHandler(BaseHandler):
94 # Proxies must be in front
95 handler_order = 100
97 def __init__(self, proxies=None):
98 if proxies is None:
99 proxies = getproxies()
100 assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
101 self.proxies = proxies
102 for type, url in proxies.items():
103 setattr(self, '%s_open' % type,
104 lambda r, proxy=url, type=type, meth=self.proxy_open: \
105 meth(r, proxy, type))
107 def proxy_open(self, req, proxy, type):
108 orig_type = req.get_type()
109 proxy_type, user, password, hostport = _parse_proxy(proxy)
110 if proxy_type is None:
111 proxy_type = orig_type
112 if user and password:
113 user_pass = '%s:%s' % (unquote(user), unquote(password))
114 creds = base64.encodestring(user_pass).strip()
115 req.add_header('Proxy-authorization', 'Basic ' + creds)
116 hostport = unquote(hostport)
117 req.set_proxy(hostport, proxy_type)
118 if orig_type == proxy_type:
119 # let other handlers take care of it
120 return None
121 else:
122 # need to start over, because the other handlers don't
123 # grok the proxy's URL type
124 # e.g. if we have a constructor arg proxies like so:
125 # {'http': 'ftp://proxy.example.com'}, we may end up turning
126 # a request for http://acme.example.com/a into one for
127 # ftp://proxy.example.com/a
128 return self.parent.open(req)
130 class HTTPPasswordMgr:
132 def __init__(self):
133 self.passwd = {}
135 def add_password(self, realm, uri, user, passwd):
136 # uri could be a single URI or a sequence
137 if isinstance(uri, basestring):
138 uri = [uri]
139 if not realm in self.passwd:
140 self.passwd[realm] = {}
141 for default_port in True, False:
142 reduced_uri = tuple(
143 [self.reduce_uri(u, default_port) for u in uri])
144 self.passwd[realm][reduced_uri] = (user, passwd)
146 def find_user_password(self, realm, authuri):
147 domains = self.passwd.get(realm, {})
148 for default_port in True, False:
149 reduced_authuri = self.reduce_uri(authuri, default_port)
150 for uris, authinfo in domains.iteritems():
151 for uri in uris:
152 if self.is_suburi(uri, reduced_authuri):
153 return authinfo
154 return None, None
156 def reduce_uri(self, uri, default_port=True):
157 """Accept authority or URI and extract only the authority and path."""
158 # note HTTP URLs do not have a userinfo component
159 parts = urlparse.urlsplit(uri)
160 if parts[1]:
161 # URI
162 scheme = parts[0]
163 authority = parts[1]
164 path = parts[2] or '/'
165 else:
166 # host or host:port
167 scheme = None
168 authority = uri
169 path = '/'
170 host, port = splitport(authority)
171 if default_port and port is None and scheme is not None:
172 dport = {"http": 80,
173 "https": 443,
174 }.get(scheme)
175 if dport is not None:
176 authority = "%s:%d" % (host, dport)
177 return authority, path
179 def is_suburi(self, base, test):
180 """Check if test is below base in a URI tree
182 Both args must be URIs in reduced form.
184 if base == test:
185 return True
186 if base[0] != test[0]:
187 return False
188 common = posixpath.commonprefix((base[1], test[1]))
189 if len(common) == len(base[1]):
190 return True
191 return False
194 class HTTPPasswordMgrWithDefaultRealm(HTTPPasswordMgr):
196 def find_user_password(self, realm, authuri):
197 user, password = HTTPPasswordMgr.find_user_password(self, realm,
198 authuri)
199 if user is not None:
200 return user, password
201 return HTTPPasswordMgr.find_user_password(self, None, authuri)
204 class AbstractBasicAuthHandler:
206 rx = re.compile('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', re.I)
208 # XXX there can actually be multiple auth-schemes in a
209 # www-authenticate header. should probably be a lot more careful
210 # in parsing them to extract multiple alternatives
212 def __init__(self, password_mgr=None):
213 if password_mgr is None:
214 password_mgr = HTTPPasswordMgr()
215 self.passwd = password_mgr
216 self.add_password = self.passwd.add_password
218 def http_error_auth_reqed(self, authreq, host, req, headers):
219 # host may be an authority (without userinfo) or a URL with an
220 # authority
221 # XXX could be multiple headers
222 authreq = headers.get(authreq, None)
223 if authreq:
224 mo = AbstractBasicAuthHandler.rx.search(authreq)
225 if mo:
226 scheme, realm = mo.groups()
227 if scheme.lower() == 'basic':
228 return self.retry_http_basic_auth(host, req, realm)
230 def retry_http_basic_auth(self, host, req, realm):
231 user, pw = self.passwd.find_user_password(realm, host)
232 if pw is not None:
233 raw = "%s:%s" % (user, pw)
234 auth = 'Basic %s' % base64.encodestring(raw).strip()
235 if req.headers.get(self.auth_header, None) == auth:
236 return None
237 newreq = copy.copy(req)
238 newreq.add_header(self.auth_header, auth)
239 newreq.visit = False
240 return self.parent.open(newreq)
241 else:
242 return None
245 class HTTPBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
247 auth_header = 'Authorization'
249 def http_error_401(self, req, fp, code, msg, headers):
250 url = req.get_full_url()
251 return self.http_error_auth_reqed('www-authenticate',
252 url, req, headers)
255 class ProxyBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
257 auth_header = 'Proxy-authorization'
259 def http_error_407(self, req, fp, code, msg, headers):
260 # http_error_auth_reqed requires that there is no userinfo component in
261 # authority. Assume there isn't one, since urllib2 does not (and
262 # should not, RFC 3986 s. 3.2.1) support requests for URLs containing
263 # userinfo.
264 authority = req.get_host()
265 return self.http_error_auth_reqed('proxy-authenticate',
266 authority, req, headers)
269 def randombytes(n):
270 """Return n random bytes."""
271 # Use /dev/urandom if it is available. Fall back to random module
272 # if not. It might be worthwhile to extend this function to use
273 # other platform-specific mechanisms for getting random bytes.
274 if os.path.exists("/dev/urandom"):
275 f = open("/dev/urandom")
276 s = f.read(n)
277 f.close()
278 return s
279 else:
280 L = [chr(random.randrange(0, 256)) for i in range(n)]
281 return "".join(L)
283 class AbstractDigestAuthHandler:
284 # Digest authentication is specified in RFC 2617.
286 # XXX The client does not inspect the Authentication-Info header
287 # in a successful response.
289 # XXX It should be possible to test this implementation against
290 # a mock server that just generates a static set of challenges.
292 # XXX qop="auth-int" supports is shaky
294 def __init__(self, passwd=None):
295 if passwd is None:
296 passwd = HTTPPasswordMgr()
297 self.passwd = passwd
298 self.add_password = self.passwd.add_password
299 self.retried = 0
300 self.nonce_count = 0
302 def reset_retry_count(self):
303 self.retried = 0
305 def http_error_auth_reqed(self, auth_header, host, req, headers):
306 authreq = headers.get(auth_header, None)
307 if self.retried > 5:
308 # Don't fail endlessly - if we failed once, we'll probably
309 # fail a second time. Hm. Unless the Password Manager is
310 # prompting for the information. Crap. This isn't great
311 # but it's better than the current 'repeat until recursion
312 # depth exceeded' approach <wink>
313 raise HTTPError(req.get_full_url(), 401, "digest auth failed",
314 headers, None)
315 else:
316 self.retried += 1
317 if authreq:
318 scheme = authreq.split()[0]
319 if scheme.lower() == 'digest':
320 return self.retry_http_digest_auth(req, authreq)
322 def retry_http_digest_auth(self, req, auth):
323 token, challenge = auth.split(' ', 1)
324 chal = parse_keqv_list(parse_http_list(challenge))
325 auth = self.get_authorization(req, chal)
326 if auth:
327 auth_val = 'Digest %s' % auth
328 if req.headers.get(self.auth_header, None) == auth_val:
329 return None
330 newreq = copy.copy(req)
331 newreq.add_unredirected_header(self.auth_header, auth_val)
332 newreq.visit = False
333 return self.parent.open(newreq)
335 def get_cnonce(self, nonce):
336 # The cnonce-value is an opaque
337 # quoted string value provided by the client and used by both client
338 # and server to avoid chosen plaintext attacks, to provide mutual
339 # authentication, and to provide some message integrity protection.
340 # This isn't a fabulous effort, but it's probably Good Enough.
341 dig = sha.new("%s:%s:%s:%s" % (self.nonce_count, nonce, time.ctime(),
342 randombytes(8))).hexdigest()
343 return dig[:16]
345 def get_authorization(self, req, chal):
346 try:
347 realm = chal['realm']
348 nonce = chal['nonce']
349 qop = chal.get('qop')
350 algorithm = chal.get('algorithm', 'MD5')
351 # mod_digest doesn't send an opaque, even though it isn't
352 # supposed to be optional
353 opaque = chal.get('opaque', None)
354 except KeyError:
355 return None
357 H, KD = self.get_algorithm_impls(algorithm)
358 if H is None:
359 return None
361 user, pw = self.passwd.find_user_password(realm, req.get_full_url())
362 if user is None:
363 return None
365 # XXX not implemented yet
366 if req.has_data():
367 entdig = self.get_entity_digest(req.get_data(), chal)
368 else:
369 entdig = None
371 A1 = "%s:%s:%s" % (user, realm, pw)
372 A2 = "%s:%s" % (req.get_method(),
373 # XXX selector: what about proxies and full urls
374 req.get_selector())
375 if qop == 'auth':
376 self.nonce_count += 1
377 ncvalue = '%08x' % self.nonce_count
378 cnonce = self.get_cnonce(nonce)
379 noncebit = "%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, H(A2))
380 respdig = KD(H(A1), noncebit)
381 elif qop is None:
382 respdig = KD(H(A1), "%s:%s" % (nonce, H(A2)))
383 else:
384 # XXX handle auth-int.
385 pass
387 # XXX should the partial digests be encoded too?
389 base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \
390 'response="%s"' % (user, realm, nonce, req.get_selector(),
391 respdig)
392 if opaque:
393 base += ', opaque="%s"' % opaque
394 if entdig:
395 base += ', digest="%s"' % entdig
396 base += ', algorithm="%s"' % algorithm
397 if qop:
398 base += ', qop=auth, nc=%s, cnonce="%s"' % (ncvalue, cnonce)
399 return base
401 def get_algorithm_impls(self, algorithm):
402 # lambdas assume digest modules are imported at the top level
403 if algorithm == 'MD5':
404 H = lambda x: md5.new(x).hexdigest()
405 elif algorithm == 'SHA':
406 H = lambda x: sha.new(x).hexdigest()
407 # XXX MD5-sess
408 KD = lambda s, d: H("%s:%s" % (s, d))
409 return H, KD
411 def get_entity_digest(self, data, chal):
412 # XXX not implemented yet
413 return None
416 class HTTPDigestAuthHandler(BaseHandler, AbstractDigestAuthHandler):
417 """An authentication protocol defined by RFC 2069
419 Digest authentication improves on basic authentication because it
420 does not transmit passwords in the clear.
423 auth_header = 'Authorization'
424 handler_order = 490
426 def http_error_401(self, req, fp, code, msg, headers):
427 host = urlparse.urlparse(req.get_full_url())[1]
428 retry = self.http_error_auth_reqed('www-authenticate',
429 host, req, headers)
430 self.reset_retry_count()
431 return retry
434 class ProxyDigestAuthHandler(BaseHandler, AbstractDigestAuthHandler):
436 auth_header = 'Proxy-Authorization'
437 handler_order = 490
439 def http_error_407(self, req, fp, code, msg, headers):
440 host = req.get_host()
441 retry = self.http_error_auth_reqed('proxy-authenticate',
442 host, req, headers)
443 self.reset_retry_count()
444 return retry
447 # XXX ugly implementation, should probably not bother deriving
448 class HTTPProxyPasswordMgr(HTTPPasswordMgr):
449 # has default realm and host/port
450 def add_password(self, realm, uri, user, passwd):
451 # uri could be a single URI or a sequence
452 if uri is None or isinstance(uri, basestring):
453 uris = [uri]
454 else:
455 uris = uri
456 passwd_by_domain = self.passwd.setdefault(realm, {})
457 for uri in uris:
458 for default_port in True, False:
459 reduced_uri = self.reduce_uri(uri, default_port)
460 passwd_by_domain[reduced_uri] = (user, passwd)
462 def find_user_password(self, realm, authuri):
463 attempts = [(realm, authuri), (None, authuri)]
464 # bleh, want default realm to take precedence over default
465 # URI/authority, hence this outer loop
466 for default_uri in False, True:
467 for realm, authuri in attempts:
468 authinfo_by_domain = self.passwd.get(realm, {})
469 for default_port in True, False:
470 reduced_authuri = self.reduce_uri(authuri, default_port)
471 for uri, authinfo in authinfo_by_domain.iteritems():
472 if uri is None and not default_uri:
473 continue
474 if self.is_suburi(uri, reduced_authuri):
475 return authinfo
476 user, password = None, None
478 if user is not None:
479 break
480 return user, password
482 def reduce_uri(self, uri, default_port=True):
483 if uri is None:
484 return None
485 return HTTPPasswordMgr.reduce_uri(self, uri, default_port)
487 def is_suburi(self, base, test):
488 if base is None:
489 # default to the proxy's host/port
490 hostport, path = test
491 base = (hostport, "/")
492 return HTTPPasswordMgr.is_suburi(self, base, test)
495 class HTTPSClientCertMgr(HTTPPasswordMgr):
496 # implementation inheritance: this is not a proper subclass
497 def add_key_cert(self, uri, key_file, cert_file):
498 self.add_password(None, uri, key_file, cert_file)
499 def find_key_cert(self, authuri):
500 return HTTPPasswordMgr.find_user_password(self, None, authuri)