More Python 3 support
[zeroinstall/solver.git] / zeroinstall / injector / trust.py
blob7f9ed18a4e52f4fc0e4d90365690b445d235c1f7
1 """
2 Records who we trust to sign feeds.
4 Trust is divided up into domains, so that it is possible to trust a key
5 in some cases and not others.
7 @var trust_db: Singleton trust database instance.
8 """
10 # Copyright (C) 2009, Thomas Leonard
11 # See the README file for details, or visit http://0install.net.
13 from zeroinstall import _, SafeException
14 import os
15 from logging import info
17 from zeroinstall import support
18 from zeroinstall.support import basedir, tasks
19 from .namespaces import config_site, config_prog, XMLNS_TRUST
21 KEY_INFO_TIMEOUT = 10 # Maximum time to wait for response from key-info-server
23 class TrustDB(object):
24 """A database of trusted keys.
25 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted
26 @type keys: {str: set(str)}
27 @ivar watchers: callbacks invoked by L{notify}
28 @see: L{trust_db} - the singleton instance of this class"""
29 __slots__ = ['keys', 'watchers']
31 def __init__(self):
32 self.keys = None
33 self.watchers = []
35 def is_trusted(self, fingerprint, domain = None):
36 self.ensure_uptodate()
38 domains = self.keys.get(fingerprint, None)
39 if not domains: return False # Unknown key
41 if domain is None:
42 return True # Deprecated
44 return domain in domains or '*' in domains
46 def get_trust_domains(self, fingerprint):
47 """Return the set of domains in which this key is trusted.
48 If the list includes '*' then the key is trusted everywhere.
49 @since: 0.27
50 """
51 self.ensure_uptodate()
52 return self.keys.get(fingerprint, set())
54 def get_keys_for_domain(self, domain):
55 """Return the set of keys trusted for this domain.
56 @since: 0.27"""
57 self.ensure_uptodate()
58 return set([fp for fp in self.keys
59 if domain in self.keys[fp]])
61 def trust_key(self, fingerprint, domain = '*'):
62 """Add key to the list of trusted fingerprints.
63 @param fingerprint: base 16 fingerprint without any spaces
64 @type fingerprint: str
65 @param domain: domain in which key is to be trusted
66 @type domain: str
67 @note: call L{notify} after trusting one or more new keys"""
68 if self.is_trusted(fingerprint, domain): return
70 int(fingerprint, 16) # Ensure fingerprint is valid
72 if fingerprint not in self.keys:
73 self.keys[fingerprint] = set()
75 #if domain == '*':
76 # warn("Calling trust_key() without a domain is deprecated")
78 self.keys[fingerprint].add(domain)
79 self.save()
81 def untrust_key(self, key, domain = '*'):
82 self.ensure_uptodate()
83 self.keys[key].remove(domain)
85 if not self.keys[key]:
86 # No more domains for this key
87 del self.keys[key]
89 self.save()
91 def save(self):
92 from xml.dom import minidom
93 import tempfile
95 doc = minidom.Document()
96 root = doc.createElementNS(XMLNS_TRUST, 'trusted-keys')
97 root.setAttribute('xmlns', XMLNS_TRUST)
98 doc.appendChild(root)
100 for fingerprint in self.keys:
101 keyelem = doc.createElementNS(XMLNS_TRUST, 'key')
102 root.appendChild(keyelem)
103 keyelem.setAttribute('fingerprint', fingerprint)
104 for domain in self.keys[fingerprint]:
105 domainelem = doc.createElementNS(XMLNS_TRUST, 'domain')
106 domainelem.setAttribute('value', domain)
107 keyelem.appendChild(domainelem)
109 d = basedir.save_config_path(config_site, config_prog)
111 with tempfile.NamedTemporaryFile(dir = d, prefix = 'trust-', delete = False, mode = 'wt') as tmp:
112 doc.writexml(tmp, indent = "", addindent = " ", newl = "\n", encoding = 'utf-8')
113 support.portable_rename(tmp.name, os.path.join(d, 'trustdb.xml'))
115 def notify(self):
116 """Call all watcher callbacks.
117 This should be called after trusting or untrusting one or more new keys.
118 @since: 0.25"""
119 for w in self.watchers: w()
121 def ensure_uptodate(self):
122 from xml.dom import minidom
124 # This is a bit inefficient... (could cache things)
125 self.keys = {}
127 trust = basedir.load_first_config(config_site, config_prog, 'trustdb.xml')
128 if trust:
129 keys = minidom.parse(trust).documentElement
130 for key in keys.getElementsByTagNameNS(XMLNS_TRUST, 'key'):
131 domains = set()
132 self.keys[key.getAttribute('fingerprint')] = domains
133 for domain in key.getElementsByTagNameNS(XMLNS_TRUST, 'domain'):
134 domains.add(domain.getAttribute('value'))
135 else:
136 # Convert old database to XML format
137 trust = basedir.load_first_config(config_site, config_prog, 'trust')
138 if trust:
139 #print "Loading trust from", trust_db
140 with open(trust, 'rt') as stream:
141 for key in stream:
142 if key:
143 self.keys[key] = set(['*'])
144 else:
145 # No trust database found.
146 # Trust Thomas Leonard's key for 0install.net by default.
147 # Avoids distracting confirmation box on first run when we check
148 # for updates to the GUI.
149 self.keys['92429807C9853C0744A68B9AAE07828059A53CC1'] = set(['0install.net'])
151 def domain_from_url(url):
152 """Extract the trust domain for a URL.
153 @param url: the feed's URL
154 @type url: str
155 @return: the trust domain
156 @rtype: str
157 @since: 0.27
158 @raise SafeException: the URL can't be parsed"""
159 try:
160 import urlparse
161 except ImportError:
162 from urllib import parse as urlparse # Python 3
164 if os.path.isabs(url):
165 raise SafeException(_("Can't get domain from a local path: '%s'") % url)
166 domain = urlparse.urlparse(url)[1]
167 if domain and domain != '*':
168 return domain
169 raise SafeException(_("Can't extract domain from URL '%s'") % url)
171 trust_db = TrustDB()
173 class TrustMgr(object):
174 """A TrustMgr handles the process of deciding whether to trust new keys
175 (contacting the key information server, prompting the user, accepting automatically, etc)
176 @since: 0.53"""
178 __slots__ = ['config', '_current_confirm']
180 def __init__(self, config):
181 self.config = config
182 self._current_confirm = None # (a lock to prevent asking the user multiple questions at once)
184 @tasks.async
185 def confirm_keys(self, pending):
186 """We don't trust any of the signatures yet. Collect information about them and add the keys to the
187 trusted list, possibly after confirming with the user (via config.handler).
188 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}.
189 @since: 0.53
190 @arg pending: an object holding details of the updated feed
191 @type pending: L{PendingFeed}
192 @return: A blocker that triggers when the user has chosen, or None if already done.
193 @rtype: None | L{Blocker}"""
195 assert pending.sigs
197 from zeroinstall.injector import gpg
198 valid_sigs = [s for s in pending.sigs if isinstance(s, gpg.ValidSig)]
199 if not valid_sigs:
200 def format_sig(sig):
201 msg = str(sig)
202 if sig.messages:
203 msg += "\nMessages from GPG:\n" + sig.messages
204 return msg
205 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') %
206 {'url': pending.url, 'signatures': ''.join(['\n- ' + format_sig(s) for s in pending.sigs])})
208 # Start downloading information about the keys...
209 fetcher = self.config.fetcher
210 kfs = {}
211 for sig in valid_sigs:
212 kfs[sig] = fetcher.fetch_key_info(sig.fingerprint)
214 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog
215 # box update while the user is looking at it, and may allow it to be skipped completely in some
216 # cases.
217 timeout = tasks.TimeoutBlocker(KEY_INFO_TIMEOUT, "key info timeout")
218 while True:
219 key_info_blockers = [sig_info.blocker for sig_info in kfs.values() if sig_info.blocker is not None]
220 if not key_info_blockers:
221 break
222 info("Waiting for response from key-info server: %s", key_info_blockers)
223 yield [timeout] + key_info_blockers
224 if timeout.happened:
225 info("Timeout waiting for key info response")
226 break
228 # If we're already confirming something else, wait for that to finish...
229 while self._current_confirm is not None:
230 info("Waiting for previous key confirmations to finish")
231 yield self._current_confirm
233 domain = domain_from_url(pending.url)
235 if self.config.auto_approve_keys:
236 existing_feed = self.config.iface_cache.get_feed(pending.url)
237 if not existing_feed:
238 changes = False
239 for sig, kf in kfs.items():
240 for key_info in kf.info:
241 if key_info.getAttribute("vote") == "good":
242 info(_("Automatically approving key for new feed %s based on response from key info server"), pending.url)
243 trust_db.trust_key(sig.fingerprint, domain)
244 changes = True
245 if changes:
246 trust_db.notify()
248 # Check whether we still need to confirm. The user may have
249 # already approved one of the keys while dealing with another
250 # feed, or we may have just auto-approved it.
251 for sig in kfs:
252 is_trusted = trust_db.is_trusted(sig.fingerprint, domain)
253 if is_trusted:
254 return
256 # Take the lock and confirm this feed
257 self._current_confirm = lock = tasks.Blocker('confirm key lock')
258 try:
259 done = self.config.handler.confirm_import_feed(pending, kfs)
260 if done is not None:
261 yield done
262 tasks.check(done)
263 finally:
264 self._current_confirm = None
265 lock.trigger()