Assume errors from GPG are UTF-8
[zeroinstall.git] / zeroinstall / injector / gpg.py
blobd523a5e9dedf406a155a7f4df4b6ca8197ef0d8f
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 from zeroinstall import _
13 import subprocess
14 import base64, re
15 import os
16 import tempfile
17 from logging import info, warn
19 from zeroinstall.support import find_in_path, basedir
20 from zeroinstall.injector.trust import trust_db
21 from zeroinstall.injector.model import SafeException
23 _gnupg_options = None
24 def _run_gpg(args, **kwargs):
25 global _gnupg_options
26 if _gnupg_options is None:
27 gpg_path = find_in_path('gpg') or find_in_path('gpg2') or 'gpg'
28 _gnupg_options = [gpg_path, '--no-secmem-warning']
30 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ:
31 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')]
32 info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1])
34 return subprocess.Popen(_gnupg_options + args, **kwargs)
36 class Signature(object):
37 """Abstract base class for signature check results.
38 @ivar status: the raw data returned by GPG
39 @ivar messages: any messages printed by GPG which may be relevant to this signature
40 """
41 status = None
42 messages = None
44 def __init__(self, status):
45 self.status = status
47 def is_trusted(self, domain = None):
48 """Whether this signature is trusted by the user."""
49 return False
51 def need_key(self):
52 """Returns the ID of the key that must be downloaded to check this signature."""
53 return None
55 class ValidSig(Signature):
56 """A valid signature check result."""
57 FINGERPRINT = 0
58 TIMESTAMP = 2
60 def __str__(self):
61 return "Valid signature from " + self.status[self.FINGERPRINT]
63 def is_trusted(self, domain = None):
64 """Asks the L{trust.trust_db}."""
65 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
67 def get_timestamp(self):
68 """Get the time this signature was made."""
69 return int(self.status[self.TIMESTAMP])
71 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
73 def get_details(self):
74 """Call 'gpg --list-keys' and return the results split into lines and columns.
75 @rtype: [[str]]"""
76 # Note: GnuPG 2 always uses --fixed-list-mode
77 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
78 cout, unused = child.communicate()
79 if child.returncode:
80 info(_("GPG exited with code %d") % child.returncode)
81 details = []
82 for line in cout.split('\n'):
83 details.append(line.split(':'))
84 return details
86 class BadSig(Signature):
87 """A bad signature (doesn't match the message)."""
88 KEYID = 0
90 def __str__(self):
91 return _("BAD signature by %s (the message has been tampered with)") \
92 % self.status[self.KEYID]
94 class ErrSig(Signature):
95 """Error while checking a signature."""
96 KEYID = 0
97 ALG = 1
98 RC = -1
100 def __str__(self):
101 msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
102 rc = int(self.status[self.RC])
103 if rc == 4:
104 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
105 elif rc == 9:
106 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
107 else:
108 msg += _("Unknown reason code %d") % rc
109 return msg
111 def need_key(self):
112 rc = int(self.status[self.RC])
113 if rc == 9:
114 return self.status[self.KEYID]
115 return None
117 class Key:
118 """A GPG key.
119 @since: 0.27
120 @param fingerprint: the fingerprint of the key
121 @type fingerprint: str
122 @ivar name: a short name for the key, extracted from the full name
123 @type name: str
125 def __init__(self, fingerprint):
126 self.fingerprint = fingerprint
127 self.name = '(unknown)'
129 def get_short_name(self):
130 return self.name.split(' (', 1)[0].split(' <', 1)[0]
132 def load_keys(fingerprints):
133 """Load a set of keys at once.
134 This is much more efficient than making individual calls to L{load_key}.
135 @return: a list of loaded keys, indexed by fingerprint
136 @rtype: {str: L{Key}}
137 @since: 0.27"""
138 import codecs
140 keys = {}
142 # Otherwise GnuPG returns everything...
143 if not fingerprints: return keys
145 for fp in fingerprints:
146 keys[fp] = Key(fp)
148 current_fpr = None
149 current_uid = None
151 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
152 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
153 try:
154 for line in child.stdout:
155 if line.startswith('pub:'):
156 current_fpr = None
157 current_uid = None
158 if line.startswith('fpr:'):
159 current_fpr = line.split(':')[9]
160 if current_fpr in keys and current_uid:
161 # This is probably a subordinate key, where the fingerprint
162 # comes after the uid, not before. Note: we assume the subkey is
163 # cross-certified, as recent always ones are.
164 try:
165 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8')
166 except:
167 warn("Not UTF-8: %s", current_uid)
168 keys[current_fpr].name = current_uid
169 if line.startswith('uid:'):
170 assert current_fpr is not None
171 # Only take primary UID
172 if current_uid: continue
173 parts = line.split(':')
174 current_uid = parts[9]
175 if current_fpr in keys:
176 keys[current_fpr].name = current_uid
177 finally:
178 if child.wait():
179 warn(_("gpg --list-keys failed with exit code %d") % child.returncode)
181 return keys
183 def load_key(fingerprint):
184 """Query gpg for information about this key.
185 @return: a new key
186 @rtype: L{Key}
187 @since: 0.27"""
188 return load_keys([fingerprint])[fingerprint]
190 def import_key(stream):
191 """Run C{gpg --import} with this stream as stdin."""
192 errors = tempfile.TemporaryFile()
194 child = _run_gpg(['--quiet', '--import', '--batch'],
195 stdin = stream, stderr = errors)
197 status = child.wait()
199 errors.seek(0)
200 error_messages = errors.read().strip()
201 errors.close()
203 if error_messages:
204 import codecs
205 decoder = codecs.lookup('utf-8')
206 error_messages = decoder.decode(error_messages, errors = 'replace')[0]
208 if status != 0:
209 if error_messages:
210 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
211 else:
212 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status)
213 elif error_messages:
214 warn(_("Warnings from 'gpg --import':\n%s") % error_messages)
216 def _check_plain_stream(stream):
217 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
218 errors = tempfile.TemporaryFile()
220 status_r, status_w = os.pipe()
222 # Note: Should ideally close status_r in the child, but we want to support Windows too
223 child = _run_gpg(['--decrypt',
224 # Not all versions support this:
225 #'--max-output', str(1024 * 1024),
226 '--batch',
227 '--status-fd', str(status_w)],
228 stdin = stream,
229 stdout = data,
230 stderr = errors)
232 os.close(status_w)
234 try:
235 sigs = _get_sigs_from_gpg_status_stream(os.fdopen(status_r), child, errors)
236 finally:
237 data.seek(0)
238 return (data, sigs)
240 def _check_xml_stream(stream):
241 xml_comment_start = '<!-- Base64 Signature'
243 data_to_check = stream.read()
245 last_comment = data_to_check.rfind('\n' + xml_comment_start)
246 if last_comment < 0:
247 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
248 last_comment += 1 # Include new-line in data
250 data = tempfile.TemporaryFile()
251 data.write(data_to_check[:last_comment])
252 data.flush()
253 os.lseek(data.fileno(), 0, 0)
255 errors = tempfile.TemporaryFile()
257 sig_lines = data_to_check[last_comment:].split('\n')
258 if sig_lines[0].strip() != xml_comment_start:
259 raise SafeException(_('Bad signature block: extra data on comment line'))
260 while sig_lines and not sig_lines[-1].strip():
261 del sig_lines[-1]
262 if sig_lines[-1].strip() != '-->':
263 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
264 sig_data = '\n'.join(sig_lines[1:-1])
266 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
267 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
268 try:
269 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
270 except Exception, ex:
271 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))
273 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
274 try:
275 sig_file = os.fdopen(sig_fd, 'w')
276 sig_file.write(sig_data)
277 sig_file.close()
279 # Note: Should ideally close status_r in the child, but we want to support Windows too
280 child = _run_gpg([# Not all versions support this:
281 #'--max-output', str(1024 * 1024),
282 '--batch',
283 # Windows GPG can only cope with "1" here
284 '--status-fd', '1',
285 '--verify', sig_name, '-'],
286 stdin = data,
287 stdout = subprocess.PIPE,
288 stderr = errors)
290 try:
291 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
292 finally:
293 os.lseek(stream.fileno(), 0, 0)
294 stream.seek(0)
295 finally:
296 os.unlink(sig_name)
297 return (stream, sigs)
299 def check_stream(stream):
300 """Pass stream through gpg --decrypt to get the data, the error text,
301 and a list of signatures (good or bad). If stream starts with "<?xml "
302 then get the signature from a comment at the end instead (and the returned
303 data is the original stream). stream must be seekable.
304 @note: Stream returned may or may not be the one passed in. Be careful!
305 @return: (data_stream, [Signatures])"""
307 stream.seek(0)
309 start = stream.read(6)
310 stream.seek(0)
311 if start == "<?xml ":
312 return _check_xml_stream(stream)
313 elif start == '-----B':
314 import warnings
315 warnings.warn(_("Plain GPG-signed feeds are deprecated!"), DeprecationWarning, stacklevel = 2)
316 os.lseek(stream.fileno(), 0, 0)
317 return _check_plain_stream(stream)
318 else:
319 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
321 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
322 """Read messages from status_r and collect signatures from it.
323 When done, reap 'child'.
324 If there are no signatures, throw SafeException (using errors
325 for the error message if non-empty)."""
326 sigs = []
328 # Should we error out on bad signatures, even if there's a good
329 # signature too?
331 for line in status_r:
332 assert line.endswith('\n')
333 assert line.startswith('[GNUPG:] ')
334 line = line[9:-1]
335 split_line = line.split(' ')
336 code = split_line[0]
337 args = split_line[1:]
338 if code == 'VALIDSIG':
339 sigs.append(ValidSig(args))
340 elif code == 'BADSIG':
341 sigs.append(BadSig(args))
342 elif code == 'ERRSIG':
343 sigs.append(ErrSig(args))
345 status = child.wait()
347 errors.seek(0)
349 error_messages = errors.read().strip()
350 errors.close()
352 if not sigs:
353 if error_messages:
354 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
355 else:
356 raise SafeException(_("No signatures found. No error messages from GPG."))
357 elif error_messages:
358 # Attach the warnings to all the signatures, in case they're useful.
359 for s in sigs:
360 s.messages = error_messages
362 return sigs