Merged changes from master
[zeroinstall/solver.git] / zeroinstall / injector / trust.py
blob1719eedd4874747382d67110a591f0e105822066
1 """
2 Records who we trust to sign feeds.
4 Trust is divided up into domains, so that it is possible to trust a key
5 in some cases and not others.
7 @var trust_db: Singleton trust database instance.
8 """
10 # Copyright (C) 2009, Thomas Leonard
11 # See the README file for details, or visit http://0install.net.
13 from zeroinstall import _, SafeException
14 import os
15 from logging import info
17 from zeroinstall.support import basedir, tasks
18 from .namespaces import config_site, config_prog, XMLNS_TRUST
20 KEY_INFO_TIMEOUT = 10 # Maximum time to wait for response from key-info-server
22 class TrustDB(object):
23 """A database of trusted keys.
24 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted
25 @type keys: {str: set(str)}
26 @ivar watchers: callbacks invoked by L{notify}
27 @see: L{trust_db} - the singleton instance of this class"""
28 __slots__ = ['keys', 'watchers']
30 def __init__(self):
31 self.keys = None
32 self.watchers = []
34 def is_trusted(self, fingerprint, domain = None):
35 self.ensure_uptodate()
37 domains = self.keys.get(fingerprint, None)
38 if not domains: return False # Unknown key
40 if domain is None:
41 return True # Deprecated
43 return domain in domains or '*' in domains
45 def get_trust_domains(self, fingerprint):
46 """Return the set of domains in which this key is trusted.
47 If the list includes '*' then the key is trusted everywhere.
48 @since: 0.27
49 """
50 self.ensure_uptodate()
51 return self.keys.get(fingerprint, set())
53 def get_keys_for_domain(self, domain):
54 """Return the set of keys trusted for this domain.
55 @since: 0.27"""
56 self.ensure_uptodate()
57 return set([fp for fp in self.keys
58 if domain in self.keys[fp]])
60 def trust_key(self, fingerprint, domain = '*'):
61 """Add key to the list of trusted fingerprints.
62 @param fingerprint: base 16 fingerprint without any spaces
63 @type fingerprint: str
64 @param domain: domain in which key is to be trusted
65 @type domain: str
66 @note: call L{notify} after trusting one or more new keys"""
67 if self.is_trusted(fingerprint, domain): return
69 int(fingerprint, 16) # Ensure fingerprint is valid
71 if fingerprint not in self.keys:
72 self.keys[fingerprint] = set()
74 #if domain == '*':
75 # warn("Calling trust_key() without a domain is deprecated")
77 self.keys[fingerprint].add(domain)
78 self.save()
80 def untrust_key(self, key, domain = '*'):
81 self.ensure_uptodate()
82 self.keys[key].remove(domain)
84 if not self.keys[key]:
85 # No more domains for this key
86 del self.keys[key]
88 self.save()
90 def save(self):
91 from xml.dom import minidom
92 import tempfile
94 doc = minidom.Document()
95 root = doc.createElementNS(XMLNS_TRUST, 'trusted-keys')
96 root.setAttribute('xmlns', XMLNS_TRUST)
97 doc.appendChild(root)
99 for fingerprint in self.keys:
100 keyelem = doc.createElementNS(XMLNS_TRUST, 'key')
101 root.appendChild(keyelem)
102 keyelem.setAttribute('fingerprint', fingerprint)
103 for domain in self.keys[fingerprint]:
104 domainelem = doc.createElementNS(XMLNS_TRUST, 'domain')
105 domainelem.setAttribute('value', domain)
106 keyelem.appendChild(domainelem)
108 d = basedir.save_config_path(config_site, config_prog)
109 fd, tmpname = tempfile.mkstemp(dir = d, prefix = 'trust-')
110 tmp = os.fdopen(fd, 'wb')
111 doc.writexml(tmp, indent = "", addindent = " ", newl = "\n")
112 tmp.close()
114 try:
115 os.rename(tmpname, os.path.join(d, 'trustdb.xml'))
116 except OSError:
117 #Fix for rename operation not working in windows if file dest exists
118 try:
119 os.unlink(os.path.join(d, 'trustdb.xml.bak'))
120 except OSError:
121 pass
122 os.rename(os.path.join(d, 'trustdb.xml'), os.path.join(d, 'trustdb.xml.bak'))
123 os.rename(tmpname, os.path.join(d, 'trustdb.xml'))
125 def notify(self):
126 """Call all watcher callbacks.
127 This should be called after trusting or untrusting one or more new keys.
128 @since: 0.25"""
129 for w in self.watchers: w()
131 def ensure_uptodate(self):
132 from xml.dom import minidom
134 # This is a bit inefficient... (could cache things)
135 self.keys = {}
137 trust = basedir.load_first_config(config_site, config_prog, 'trustdb.xml')
138 if trust:
139 keys = minidom.parse(trust).documentElement
140 for key in keys.getElementsByTagNameNS(XMLNS_TRUST, 'key'):
141 domains = set()
142 self.keys[key.getAttribute('fingerprint')] = domains
143 for domain in key.getElementsByTagNameNS(XMLNS_TRUST, 'domain'):
144 domains.add(domain.getAttribute('value'))
145 else:
146 # Convert old database to XML format
147 trust = basedir.load_first_config(config_site, config_prog, 'trust')
148 if trust:
149 #print "Loading trust from", trust_db
150 for key in file(trust).read().split('\n'):
151 if key:
152 self.keys[key] = set(['*'])
153 else:
154 # No trust database found.
155 # Trust Thomas Leonard's key for 0install.net by default.
156 # Avoids distracting confirmation box on first run when we check
157 # for updates to the GUI.
158 self.keys['92429807C9853C0744A68B9AAE07828059A53CC1'] = set(['0install.net'])
160 def domain_from_url(url):
161 """Extract the trust domain for a URL.
162 @param url: the feed's URL
163 @type url: str
164 @return: the trust domain
165 @rtype: str
166 @since: 0.27
167 @raise SafeException: the URL can't be parsed"""
168 import urlparse
169 if os.path.isabs(url):
170 raise SafeException(_("Can't get domain from a local path: '%s'") % url)
171 domain = urlparse.urlparse(url)[1]
172 if domain and domain != '*':
173 return domain
174 raise SafeException(_("Can't extract domain from URL '%s'") % url)
176 trust_db = TrustDB()
178 class TrustMgr(object):
179 """A TrustMgr handles the process of deciding whether to trust new keys
180 (contacting the key information server, prompting the user, accepting automatically, etc)
181 @since: 0.53"""
183 __slots__ = ['config', '_current_confirm']
185 def __init__(self, config):
186 self.config = config
187 self._current_confirm = None # (a lock to prevent asking the user multiple questions at once)
189 @tasks.async
190 def confirm_keys(self, pending):
191 """We don't trust any of the signatures yet. Collect information about them and add the keys to the
192 trusted list, possibly after confirming with the user (via config.handler).
193 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}.
194 @since: 0.53
195 @arg pending: an object holding details of the updated feed
196 @type pending: L{PendingFeed}
197 @return: A blocker that triggers when the user has chosen, or None if already done.
198 @rtype: None | L{Blocker}"""
200 assert pending.sigs
202 from zeroinstall.injector import gpg
203 valid_sigs = [s for s in pending.sigs if isinstance(s, gpg.ValidSig)]
204 if not valid_sigs:
205 def format_sig(sig):
206 msg = str(sig)
207 if sig.messages:
208 msg += "\nMessages from GPG:\n" + sig.messages
209 return msg
210 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') %
211 {'url': pending.url, 'signatures': ''.join(['\n- ' + format_sig(s) for s in pending.sigs])})
213 # Start downloading information about the keys...
214 fetcher = self.config.fetcher
215 kfs = {}
216 for sig in valid_sigs:
217 kfs[sig] = fetcher.fetch_key_info(sig.fingerprint)
219 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog
220 # box update while the user is looking at it, and may allow it to be skipped completely in some
221 # cases.
222 timeout = tasks.TimeoutBlocker(KEY_INFO_TIMEOUT, "key info timeout")
223 while True:
224 key_info_blockers = [sig_info.blocker for sig_info in kfs.values() if sig_info.blocker is not None]
225 if not key_info_blockers:
226 break
227 info("Waiting for response from key-info server: %s", key_info_blockers)
228 yield [timeout] + key_info_blockers
229 if timeout.happened:
230 info("Timeout waiting for key info response")
231 break
233 # If we're already confirming something else, wait for that to finish...
234 while self._current_confirm is not None:
235 info("Waiting for previous key confirmations to finish")
236 yield self._current_confirm
238 # Check whether we still need to confirm. The user may have
239 # already approved one of the keys while dealing with another
240 # feed.
241 domain = domain_from_url(pending.url)
242 for sig in kfs:
243 is_trusted = trust_db.is_trusted(sig.fingerprint, domain)
244 if is_trusted:
245 return
247 # Take the lock and confirm this feed
248 self._current_confirm = lock = tasks.Blocker('confirm key lock')
249 try:
250 done = self.config.handler.confirm_import_feed(pending, kfs)
251 if done is not None:
252 yield done
253 tasks.check(done)
254 finally:
255 self._current_confirm = None
256 lock.trigger()