2 Records who we trust to sign feeds.
4 Trust is divided up into domains, so that it is possible to trust a key
5 in some cases and not others.
7 @var trust_db: Singleton trust database instance.
10 # Copyright (C) 2009, Thomas Leonard
11 # See the README file for details, or visit http://0install.net.
13 from zeroinstall
import _
, SafeException
15 from logging
import info
17 from zeroinstall
.support
import basedir
, tasks
18 from .namespaces
import config_site
, config_prog
, XMLNS_TRUST
20 KEY_INFO_TIMEOUT
= 10 # Maximum time to wait for response from key-info-server
22 class TrustDB(object):
23 """A database of trusted keys.
24 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted
25 @type keys: {str: set(str)}
26 @ivar watchers: callbacks invoked by L{notify}
27 @see: L{trust_db} - the singleton instance of this class"""
28 __slots__
= ['keys', 'watchers']
34 def is_trusted(self
, fingerprint
, domain
= None):
35 self
.ensure_uptodate()
37 domains
= self
.keys
.get(fingerprint
, None)
38 if not domains
: return False # Unknown key
41 return True # Deprecated
43 return domain
in domains
or '*' in domains
45 def get_trust_domains(self
, fingerprint
):
46 """Return the set of domains in which this key is trusted.
47 If the list includes '*' then the key is trusted everywhere.
50 self
.ensure_uptodate()
51 return self
.keys
.get(fingerprint
, set())
53 def get_keys_for_domain(self
, domain
):
54 """Return the set of keys trusted for this domain.
56 self
.ensure_uptodate()
57 return set([fp
for fp
in self
.keys
58 if domain
in self
.keys
[fp
]])
60 def trust_key(self
, fingerprint
, domain
= '*'):
61 """Add key to the list of trusted fingerprints.
62 @param fingerprint: base 16 fingerprint without any spaces
63 @type fingerprint: str
64 @param domain: domain in which key is to be trusted
66 @note: call L{notify} after trusting one or more new keys"""
67 if self
.is_trusted(fingerprint
, domain
): return
69 int(fingerprint
, 16) # Ensure fingerprint is valid
71 if fingerprint
not in self
.keys
:
72 self
.keys
[fingerprint
] = set()
75 # warn("Calling trust_key() without a domain is deprecated")
77 self
.keys
[fingerprint
].add(domain
)
80 def untrust_key(self
, key
, domain
= '*'):
81 self
.ensure_uptodate()
82 self
.keys
[key
].remove(domain
)
84 if not self
.keys
[key
]:
85 # No more domains for this key
91 from xml
.dom
import minidom
94 doc
= minidom
.Document()
95 root
= doc
.createElementNS(XMLNS_TRUST
, 'trusted-keys')
96 root
.setAttribute('xmlns', XMLNS_TRUST
)
99 for fingerprint
in self
.keys
:
100 keyelem
= doc
.createElementNS(XMLNS_TRUST
, 'key')
101 root
.appendChild(keyelem
)
102 keyelem
.setAttribute('fingerprint', fingerprint
)
103 for domain
in self
.keys
[fingerprint
]:
104 domainelem
= doc
.createElementNS(XMLNS_TRUST
, 'domain')
105 domainelem
.setAttribute('value', domain
)
106 keyelem
.appendChild(domainelem
)
108 d
= basedir
.save_config_path(config_site
, config_prog
)
109 fd
, tmpname
= tempfile
.mkstemp(dir = d
, prefix
= 'trust-')
110 tmp
= os
.fdopen(fd
, 'wb')
111 doc
.writexml(tmp
, indent
= "", addindent
= " ", newl
= "\n")
114 os
.rename(tmpname
, os
.path
.join(d
, 'trustdb.xml'))
117 """Call all watcher callbacks.
118 This should be called after trusting or untrusting one or more new keys.
120 for w
in self
.watchers
: w()
122 def ensure_uptodate(self
):
123 from xml
.dom
import minidom
125 # This is a bit inefficient... (could cache things)
128 trust
= basedir
.load_first_config(config_site
, config_prog
, 'trustdb.xml')
130 keys
= minidom
.parse(trust
).documentElement
131 for key
in keys
.getElementsByTagNameNS(XMLNS_TRUST
, 'key'):
133 self
.keys
[key
.getAttribute('fingerprint')] = domains
134 for domain
in key
.getElementsByTagNameNS(XMLNS_TRUST
, 'domain'):
135 domains
.add(domain
.getAttribute('value'))
137 # Convert old database to XML format
138 trust
= basedir
.load_first_config(config_site
, config_prog
, 'trust')
140 #print "Loading trust from", trust_db
141 for key
in open(trust
).read().split('\n'):
143 self
.keys
[key
] = set(['*'])
145 # No trust database found.
146 # Trust Thomas Leonard's key for 0install.net by default.
147 # Avoids distracting confirmation box on first run when we check
148 # for updates to the GUI.
149 self
.keys
['92429807C9853C0744A68B9AAE07828059A53CC1'] = set(['0install.net'])
151 def domain_from_url(url
):
152 """Extract the trust domain for a URL.
153 @param url: the feed's URL
155 @return: the trust domain
158 @raise SafeException: the URL can't be parsed"""
160 if os
.path
.isabs(url
):
161 raise SafeException(_("Can't get domain from a local path: '%s'") % url
)
162 domain
= urlparse
.urlparse(url
)[1]
163 if domain
and domain
!= '*':
165 raise SafeException(_("Can't extract domain from URL '%s'") % url
)
169 class TrustMgr(object):
170 """A TrustMgr handles the process of deciding whether to trust new keys
171 (contacting the key information server, prompting the user, accepting automatically, etc)
174 __slots__
= ['config', '_current_confirm']
176 def __init__(self
, config
):
178 self
._current
_confirm
= None # (a lock to prevent asking the user multiple questions at once)
181 def confirm_keys(self
, pending
):
182 """We don't trust any of the signatures yet. Collect information about them and add the keys to the
183 trusted list, possibly after confirming with the user (via config.handler).
184 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}.
186 @arg pending: an object holding details of the updated feed
187 @type pending: L{PendingFeed}
188 @return: A blocker that triggers when the user has chosen, or None if already done.
189 @rtype: None | L{Blocker}"""
193 from zeroinstall
.injector
import gpg
194 valid_sigs
= [s
for s
in pending
.sigs
if isinstance(s
, gpg
.ValidSig
)]
199 msg
+= "\nMessages from GPG:\n" + sig
.messages
201 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') %
202 {'url': pending
.url
, 'signatures': ''.join(['\n- ' + format_sig(s
) for s
in pending
.sigs
])})
204 # Start downloading information about the keys...
205 fetcher
= self
.config
.fetcher
207 for sig
in valid_sigs
:
208 kfs
[sig
] = fetcher
.fetch_key_info(sig
.fingerprint
)
210 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog
211 # box update while the user is looking at it, and may allow it to be skipped completely in some
213 timeout
= tasks
.TimeoutBlocker(KEY_INFO_TIMEOUT
, "key info timeout")
215 key_info_blockers
= [sig_info
.blocker
for sig_info
in kfs
.values() if sig_info
.blocker
is not None]
216 if not key_info_blockers
:
218 info("Waiting for response from key-info server: %s", key_info_blockers
)
219 yield [timeout
] + key_info_blockers
221 info("Timeout waiting for key info response")
224 # If we're already confirming something else, wait for that to finish...
225 while self
._current
_confirm
is not None:
226 info("Waiting for previous key confirmations to finish")
227 yield self
._current
_confirm
229 domain
= domain_from_url(pending
.url
)
231 if self
.config
.auto_approve_keys
:
232 existing_feed
= self
.config
.iface_cache
.get_feed(pending
.url
)
233 if not existing_feed
:
235 for sig
, kf
in kfs
.iteritems():
236 for key_info
in kf
.info
:
237 if key_info
.getAttribute("vote") == "good":
238 info(_("Automatically approving key for new feed %s based on response from key info server"), pending
.url
)
239 trust_db
.trust_key(sig
.fingerprint
, domain
)
244 # Check whether we still need to confirm. The user may have
245 # already approved one of the keys while dealing with another
246 # feed, or we may have just auto-approved it.
248 is_trusted
= trust_db
.is_trusted(sig
.fingerprint
, domain
)
252 # Take the lock and confirm this feed
253 self
._current
_confirm
= lock
= tasks
.Blocker('confirm key lock')
255 done
= self
.config
.handler
.confirm_import_feed(pending
, kfs
)
260 self
._current
_confirm
= None