2 Records who we trust to sign feeds.
4 Trust is divided up into domains, so that it is possible to trust a key
5 in some cases and not others.
7 @var trust_db: Singleton trust database instance.
10 # Copyright (C) 2009, Thomas Leonard
11 # See the README file for details, or visit http://0install.net.
13 from zeroinstall
import _
, SafeException
15 from logging
import info
17 from zeroinstall
import support
18 from zeroinstall
.support
import basedir
, tasks
19 from .namespaces
import config_site
, config_prog
, XMLNS_TRUST
21 KEY_INFO_TIMEOUT
= 10 # Maximum time to wait for response from key-info-server
23 class TrustDB(object):
24 """A database of trusted keys.
25 @ivar keys: maps trusted key fingerprints to a set of domains for which where it is trusted
26 @type keys: {str: set(str)}
27 @ivar watchers: callbacks invoked by L{notify}
28 @see: L{trust_db} - the singleton instance of this class"""
29 __slots__
= ['keys', 'watchers']
35 def is_trusted(self
, fingerprint
, domain
= None):
36 self
.ensure_uptodate()
38 domains
= self
.keys
.get(fingerprint
, None)
39 if not domains
: return False # Unknown key
42 return True # Deprecated
44 return domain
in domains
or '*' in domains
46 def get_trust_domains(self
, fingerprint
):
47 """Return the set of domains in which this key is trusted.
48 If the list includes '*' then the key is trusted everywhere.
51 self
.ensure_uptodate()
52 return self
.keys
.get(fingerprint
, set())
54 def get_keys_for_domain(self
, domain
):
55 """Return the set of keys trusted for this domain.
57 self
.ensure_uptodate()
58 return set([fp
for fp
in self
.keys
59 if domain
in self
.keys
[fp
]])
61 def trust_key(self
, fingerprint
, domain
= '*'):
62 """Add key to the list of trusted fingerprints.
63 @param fingerprint: base 16 fingerprint without any spaces
64 @type fingerprint: str
65 @param domain: domain in which key is to be trusted
67 @note: call L{notify} after trusting one or more new keys"""
68 if self
.is_trusted(fingerprint
, domain
): return
70 int(fingerprint
, 16) # Ensure fingerprint is valid
72 if fingerprint
not in self
.keys
:
73 self
.keys
[fingerprint
] = set()
76 # warn("Calling trust_key() without a domain is deprecated")
78 self
.keys
[fingerprint
].add(domain
)
81 def untrust_key(self
, key
, domain
= '*'):
82 self
.ensure_uptodate()
83 self
.keys
[key
].remove(domain
)
85 if not self
.keys
[key
]:
86 # No more domains for this key
92 from xml
.dom
import minidom
95 doc
= minidom
.Document()
96 root
= doc
.createElementNS(XMLNS_TRUST
, 'trusted-keys')
97 root
.setAttribute('xmlns', XMLNS_TRUST
)
100 for fingerprint
in self
.keys
:
101 keyelem
= doc
.createElementNS(XMLNS_TRUST
, 'key')
102 root
.appendChild(keyelem
)
103 keyelem
.setAttribute('fingerprint', fingerprint
)
104 for domain
in self
.keys
[fingerprint
]:
105 domainelem
= doc
.createElementNS(XMLNS_TRUST
, 'domain')
106 domainelem
.setAttribute('value', domain
)
107 keyelem
.appendChild(domainelem
)
109 d
= basedir
.save_config_path(config_site
, config_prog
)
111 with tempfile
.NamedTemporaryFile(dir = d
, prefix
= 'trust-', delete
= False, mode
= 'wt') as tmp
:
112 doc
.writexml(tmp
, indent
= "", addindent
= " ", newl
= "\n", encoding
= 'utf-8')
113 support
.portable_rename(tmp
.name
, os
.path
.join(d
, 'trustdb.xml'))
116 """Call all watcher callbacks.
117 This should be called after trusting or untrusting one or more new keys.
119 for w
in self
.watchers
: w()
121 def ensure_uptodate(self
):
122 from xml
.dom
import minidom
124 # This is a bit inefficient... (could cache things)
127 trust
= basedir
.load_first_config(config_site
, config_prog
, 'trustdb.xml')
129 keys
= minidom
.parse(trust
).documentElement
130 for key
in keys
.getElementsByTagNameNS(XMLNS_TRUST
, 'key'):
132 self
.keys
[key
.getAttribute('fingerprint')] = domains
133 for domain
in key
.getElementsByTagNameNS(XMLNS_TRUST
, 'domain'):
134 domains
.add(domain
.getAttribute('value'))
136 # Convert old database to XML format
137 trust
= basedir
.load_first_config(config_site
, config_prog
, 'trust')
139 #print "Loading trust from", trust_db
140 with
open(trust
, 'rt') as stream
:
143 self
.keys
[key
] = set(['*'])
145 def domain_from_url(url
):
146 """Extract the trust domain for a URL.
147 @param url: the feed's URL
149 @return: the trust domain
152 @raise SafeException: the URL can't be parsed"""
156 from urllib
import parse
as urlparse
# Python 3
158 if os
.path
.isabs(url
):
159 raise SafeException(_("Can't get domain from a local path: '%s'") % url
)
160 domain
= urlparse
.urlparse(url
)[1]
161 if domain
and domain
!= '*':
163 raise SafeException(_("Can't extract domain from URL '%s'") % url
)
167 class TrustMgr(object):
168 """A TrustMgr handles the process of deciding whether to trust new keys
169 (contacting the key information server, prompting the user, accepting automatically, etc)
172 __slots__
= ['config', '_current_confirm']
174 def __init__(self
, config
):
176 self
._current
_confirm
= None # (a lock to prevent asking the user multiple questions at once)
179 def confirm_keys(self
, pending
):
180 """We don't trust any of the signatures yet. Collect information about them and add the keys to the
181 trusted list, possibly after confirming with the user (via config.handler).
182 Updates the L{trust} database, and then calls L{trust.TrustDB.notify}.
184 @arg pending: an object holding details of the updated feed
185 @type pending: L{PendingFeed}
186 @return: A blocker that triggers when the user has chosen, or None if already done.
187 @rtype: None | L{Blocker}"""
191 from zeroinstall
.injector
import gpg
192 valid_sigs
= [s
for s
in pending
.sigs
if isinstance(s
, gpg
.ValidSig
)]
197 msg
+= "\nMessages from GPG:\n" + sig
.messages
199 raise SafeException(_('No valid signatures found on "%(url)s". Signatures:%(signatures)s') %
200 {'url': pending
.url
, 'signatures': ''.join(['\n- ' + format_sig(s
) for s
in pending
.sigs
])})
202 # Start downloading information about the keys...
203 fetcher
= self
.config
.fetcher
205 for sig
in valid_sigs
:
206 kfs
[sig
] = fetcher
.fetch_key_info(sig
.fingerprint
)
208 # Wait up to KEY_INFO_TIMEOUT seconds for key information to arrive. Avoids having the dialog
209 # box update while the user is looking at it, and may allow it to be skipped completely in some
211 timeout
= tasks
.TimeoutBlocker(KEY_INFO_TIMEOUT
, "key info timeout")
213 key_info_blockers
= [sig_info
.blocker
for sig_info
in kfs
.values() if sig_info
.blocker
is not None]
214 if not key_info_blockers
:
216 info("Waiting for response from key-info server: %s", key_info_blockers
)
217 yield [timeout
] + key_info_blockers
219 info("Timeout waiting for key info response")
222 # If we're already confirming something else, wait for that to finish...
223 while self
._current
_confirm
is not None:
224 info("Waiting for previous key confirmations to finish")
225 yield self
._current
_confirm
227 domain
= domain_from_url(pending
.url
)
229 if self
.config
.auto_approve_keys
:
230 existing_feed
= self
.config
.iface_cache
.get_feed(pending
.url
)
231 if not existing_feed
:
233 for sig
, kf
in kfs
.items():
234 for key_info
in kf
.info
:
235 if key_info
.getAttribute("vote") == "good":
236 info(_("Automatically approving key for new feed %s based on response from key info server"), pending
.url
)
237 trust_db
.trust_key(sig
.fingerprint
, domain
)
242 # Check whether we still need to confirm. The user may have
243 # already approved one of the keys while dealing with another
244 # feed, or we may have just auto-approved it.
246 is_trusted
= trust_db
.is_trusted(sig
.fingerprint
, domain
)
250 # Take the lock and confirm this feed
251 self
._current
_confirm
= lock
= tasks
.Blocker('confirm key lock')
253 done
= self
.config
.handler
.confirm_import_feed(pending
, kfs
)
258 self
._current
_confirm
= None