Merged changes from master
[zeroinstall/solver.git] / zeroinstall / injector / trust.py
blobada9479e3a9f5bec3b8186dcf0870581a03b198e
1 """
2 Records who we trust to sign feeds.
4 Trust is divided up into domains, so that it is possible to trust a key
5 in some cases and not others.
7 @var trust_db: Singleton trust database instance.
8 """
10 # Copyright (C) 2009, Thomas Leonard
11 # See the README file for details, or visit http://0install.net.
13 from zeroinstall import _, SafeException, logger
14 import os
16 from zeroinstall import support
17 from zeroinstall.support import basedir, tasks
18 from .namespaces import config_site, config_prog, XMLNS_TRUST
20 KEY_INFO_TIMEOUT = 10 # Maximum time to wait for response from key-info-server
22 class TrustDB(object):
23 """A database of trusted keys.
24 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted
25 @type keys: {str: set(str)}
26 @ivar watchers: callbacks invoked by L{notify}
27 @see: L{trust_db} - the singleton instance of this class"""
28 __slots__ = ['keys', 'watchers']
30 def __init__(self):
31 self.keys = None
32 self.watchers = []
34 def is_trusted(self, fingerprint, domain = None):
35 self.ensure_uptodate()
37 domains = self.keys.get(fingerprint, None)
38 if not domains: return False # Unknown key
40 if domain is None:
41 return True # Deprecated
43 return domain in domains or '*' in domains
45 def get_trust_domains(self, fingerprint):
46 """Return the set of domains in which this key is trusted.
47 If the list includes '*' then the key is trusted everywhere.
48 @since: 0.27
49 """
50 self.ensure_uptodate()
51 return self.keys.get(fingerprint, set())
53 def get_keys_for_domain(self, domain):
54 """Return the set of keys trusted for this domain.
55 @since: 0.27"""
56 self.ensure_uptodate()
57 return set([fp for fp in self.keys
58 if domain in self.keys[fp]])
60 def trust_key(self, fingerprint, domain = '*'):
61 """Add key to the list of trusted fingerprints.
62 @param fingerprint: base 16 fingerprint without any spaces
63 @type fingerprint: str
64 @param domain: domain in which key is to be trusted
65 @type domain: str
66 @note: call L{notify} after trusting one or more new keys"""
67 if self.is_trusted(fingerprint, domain): return
69 int(fingerprint, 16) # Ensure fingerprint is valid
71 if fingerprint not in self.keys:
72 self.keys[fingerprint] = set()
74 #if domain == '*':
75 # warn("Calling trust_key() without a domain is deprecated")
77 self.keys[fingerprint].add(domain)
78 self.save()
80 def untrust_key(self, key, domain = '*'):
81 self.ensure_uptodate()
82 self.keys[key].remove(domain)
84 if not self.keys[key]:
85 # No more domains for this key
86 del self.keys[key]
88 self.save()
90 def save(self):
91 from xml.dom import minidom
92 import tempfile
94 doc = minidom.Document()
95 root = doc.createElementNS(XMLNS_TRUST, 'trusted-keys')
96 root.setAttribute('xmlns', XMLNS_TRUST)
97 doc.appendChild(root)
99 for fingerprint in self.keys:
100 keyelem = doc.createElementNS(XMLNS_TRUST, 'key')
101 root.appendChild(keyelem)
102 keyelem.setAttribute('fingerprint', fingerprint)
103 for domain in self.keys[fingerprint]:
104 domainelem = doc.createElementNS(XMLNS_TRUST, 'domain')
105 domainelem.setAttribute('value', domain)
106 keyelem.appendChild(domainelem)
108 d = basedir.save_config_path(config_site, config_prog)
110 with tempfile.NamedTemporaryFile(dir = d, prefix = 'trust-', delete = False, mode = 'wt') as tmp:
111 doc.writexml(tmp, indent = "", addindent = " ", newl = "\n", encoding = 'utf-8')
112 support.portable_rename(tmp.name, os.path.join(d, 'trustdb.xml'))
114 def notify(self):
115 """Call all watcher callbacks.
116 This should be called after trusting or untrusting one or more new keys.
117 @since: 0.25"""
118 for w in self.watchers: w()
120 def ensure_uptodate(self):
121 from xml.dom import minidom
123 # This is a bit inefficient... (could cache things)
124 self.keys = {}
126 trust = basedir.load_first_config(config_site, config_prog, 'trustdb.xml')
127 if trust:
128 keys = minidom.parse(trust).documentElement
129 for key in keys.getElementsByTagNameNS(XMLNS_TRUST, 'key'):
130 domains = set()
131 self.keys[key.getAttribute('fingerprint')] = domains
132 for domain in key.getElementsByTagNameNS(XMLNS_TRUST, 'domain'):
133 domains.add(domain.getAttribute('value'))
134 else:
135 # Convert old database to XML format
136 trust = basedir.load_first_config(config_site, config_prog, 'trust')
137 if trust:
138 #print "Loading trust from", trust_db
139 with open(trust, 'rt') as stream:
140 for key in stream:
141 if key:
142 self.keys[key] = set(['*'])
144 def domain_from_url(url):
145 """Extract the trust domain for a URL.
146 @param url: the feed's URL
147 @type url: str
148 @return: the trust domain
149 @rtype: str
150 @since: 0.27
151 @raise SafeException: the URL can't be parsed"""
152 try:
153 import urlparse
154 except ImportError:
155 from urllib import parse as urlparse # Python 3
157 if os.path.isabs(url):
158 raise SafeException(_("Can't get domain from a local path: '%s'") % url)
159 domain = urlparse.urlparse(url)[1]
160 if domain and domain != '*':
161 return domain
162 raise SafeException(_("Can't extract domain from URL '%s'") % url)
164 trust_db = TrustDB()
166 class TrustMgr(object):
167 """A TrustMgr handles the process of deciding whether to trust new keys
168 (contacting the key information server, prompting the user, accepting automatically, etc)
169 @since: 0.53"""
171 __slots__ = ['config', '_current_confirm']
173 def __init__(self, config):
174 self.config = config
175 self._current_confirm = None # (a lock to prevent asking the user multiple questions at once)
177 @tasks.async
178 def confirm_keys(self, pending):
179 """We don't trust any of the signatures yet. Collect information about them and add the keys to the
180 trusted list, possibly after confirming with the user (via config.handler).
181 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}.
182 @since: 0.53
183 @arg pending: an object holding details of the updated feed
184 @type pending: L{PendingFeed}
185 @return: A blocker that triggers when the user has chosen, or None if already done.
186 @rtype: None | L{Blocker}"""
188 assert pending.sigs
190 from zeroinstall.injector import gpg
191 valid_sigs = [s for s in pending.sigs if isinstance(s, gpg.ValidSig)]
192 if not valid_sigs:
193 def format_sig(sig):
194 msg = str(sig)
195 if sig.messages:
196 msg += "\nMessages from GPG:\n" + sig.messages
197 return msg
198 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') %
199 {'url': pending.url, 'signatures': ''.join(['\n- ' + format_sig(s) for s in pending.sigs])})
201 # Start downloading information about the keys...
202 fetcher = self.config.fetcher
203 kfs = {}
204 for sig in valid_sigs:
205 kfs[sig] = fetcher.fetch_key_info(sig.fingerprint)
207 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog
208 # box update while the user is looking at it, and may allow it to be skipped completely in some
209 # cases.
210 timeout = tasks.TimeoutBlocker(KEY_INFO_TIMEOUT, "key info timeout")
211 while True:
212 key_info_blockers = [sig_info.blocker for sig_info in kfs.values() if sig_info.blocker is not None]
213 if not key_info_blockers:
214 break
215 logger.info("Waiting for response from key-info server: %s", key_info_blockers)
216 yield [timeout] + key_info_blockers
217 if timeout.happened:
218 logger.info("Timeout waiting for key info response")
219 break
221 # If we're already confirming something else, wait for that to finish...
222 while self._current_confirm is not None:
223 logger.info("Waiting for previous key confirmations to finish")
224 yield self._current_confirm
226 domain = domain_from_url(pending.url)
228 if self.config.auto_approve_keys:
229 existing_feed = self.config.iface_cache.get_feed(pending.url)
230 if not existing_feed:
231 changes = False
232 for sig, kf in kfs.items():
233 for key_info in kf.info:
234 if key_info.getAttribute("vote") == "good":
235 logger.info(_("Automatically approving key for new feed %s based on response from key info server"), pending.url)
236 trust_db.trust_key(sig.fingerprint, domain)
237 changes = True
238 if changes:
239 trust_db.notify()
241 # Check whether we still need to confirm. The user may have
242 # already approved one of the keys while dealing with another
243 # feed, or we may have just auto-approved it.
244 for sig in kfs:
245 is_trusted = trust_db.is_trusted(sig.fingerprint, domain)
246 if is_trusted:
247 return
249 # Take the lock and confirm this feed
250 self._current_confirm = lock = tasks.Blocker('confirm key lock')
251 try:
252 done = self.config.handler.confirm_import_feed(pending, kfs)
253 if done is not None:
254 yield done
255 tasks.check(done)
256 finally:
257 self._current_confirm = None
258 lock.trigger()