Start development series 0.42.1-post
[zeroinstall/zeroinstall-rsl.git] / zeroinstall / injector / gpg.py
blobf9cf6d119a892c7efdbb17ccc534eba6f035fd46
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 from zeroinstall import _
13 import subprocess
14 import base64, re
15 import os
16 import tempfile
17 from logging import info, warn
19 from zeroinstall.support import find_in_path, basedir
20 from zeroinstall.injector.trust import trust_db
21 from zeroinstall.injector.model import SafeException
23 _gnupg_options = ['gpg', '--no-secmem-warning']
24 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ:
25 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')]
26 info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1])
28 class Signature(object):
29 """Abstract base class for signature check results."""
30 status = None
32 def __init__(self, status):
33 self.status = status
35 def is_trusted(self, domain = None):
36 """Whether this signature is trusted by the user."""
37 return False
39 def need_key(self):
40 """Returns the ID of the key that must be downloaded to check this signature."""
41 return None
43 class ValidSig(Signature):
44 """A valid signature check result."""
45 FINGERPRINT = 0
46 TIMESTAMP = 2
48 def __str__(self):
49 return "Valid signature from " + self.status[self.FINGERPRINT]
51 def is_trusted(self, domain = None):
52 """Asks the L{trust.trust_db}."""
53 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
55 def get_timestamp(self):
56 """Get the time this signature was made."""
57 return int(self.status[self.TIMESTAMP])
59 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
61 def get_details(self):
62 """Call 'gpg --list-keys' and return the results split into lines and columns.
63 @rtype: [[str]]"""
64 child = subprocess.Popen(_gnupg_options + ['--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
65 cout, unused = child.communicate()
66 if child.returncode:
67 info(_("GPG exited with code %d") % child.returncode)
68 details = []
69 for line in cout.split('\n'):
70 details.append(line.split(':'))
71 return details
73 class BadSig(Signature):
74 """A bad signature (doesn't match the message)."""
75 KEYID = 0
77 def __str__(self):
78 return _("BAD signature by %s (the message has been tampered with)") \
79 % self.status[self.KEYID]
81 class ErrSig(Signature):
82 """Error while checking a signature."""
83 KEYID = 0
84 ALG = 1
85 RC = -1
87 def __str__(self):
88 msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
89 rc = int(self.status[self.RC])
90 if rc == 4:
91 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
92 elif rc == 9:
93 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
94 else:
95 msg += _("Unknown reason code %d") % rc
96 return msg
98 def need_key(self):
99 rc = int(self.status[self.RC])
100 if rc == 9:
101 return self.status[self.KEYID]
102 return None
104 class Key:
105 """A GPG key.
106 @since: 0.27
107 @param fingerprint: the fingerprint of the key
108 @type fingerprint: str
109 @ivar name: a short name for the key, extracted from the full name
110 @type name: str
112 def __init__(self, fingerprint):
113 self.fingerprint = fingerprint
114 self.name = '(unknown)'
116 def get_short_name(self):
117 return self.name.split(' (', 1)[0].split(' <', 1)[0]
119 def load_keys(fingerprints):
120 """Load a set of keys at once.
121 This is much more efficient than making individual calls to L{load_key}.
122 @return: a list of loaded keys, indexed by fingerprint
123 @rtype: {str: L{Key}}
124 @since: 0.27"""
126 keys = {}
128 # Otherwise GnuPG returns everything...
129 if not fingerprints: return keys
131 for fp in fingerprints:
132 keys[fp] = Key(fp)
134 current_fpr = None
135 current_uid = None
137 child = subprocess.Popen(_gnupg_options + ['--fixed-list-mode', '--with-colons', '--list-keys',
138 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
139 try:
140 for line in child.stdout:
141 if line.startswith('pub:'):
142 current_fpr = None
143 current_uid = None
144 if line.startswith('fpr:'):
145 current_fpr = line.split(':')[9]
146 if current_fpr in keys and current_uid:
147 # This is probably a subordinate key, where the fingerprint
148 # comes after the uid, not before. Note: we assume the subkey is
149 # cross-certified, as recent always ones are.
150 keys[current_fpr].name = current_uid
151 if line.startswith('uid:'):
152 assert current_fpr is not None
153 # Only take primary UID
154 if current_uid: continue
155 parts = line.split(':')
156 current_uid = parts[9]
157 if current_fpr in keys:
158 keys[current_fpr].name = current_uid
159 finally:
160 if child.wait():
161 warn(_("gpg --list-keys failed with exit code %d") % child.returncode)
163 return keys
165 def load_key(fingerprint):
166 """Query gpg for information about this key.
167 @return: a new key
168 @rtype: L{Key}
169 @since: 0.27"""
170 return load_keys([fingerprint])[fingerprint]
172 def import_key(stream):
173 """Run C{gpg --import} with this stream as stdin."""
174 errors = tempfile.TemporaryFile()
176 child = subprocess.Popen(_gnupg_options + ['--quiet', '--import'],
177 stdin = stream, stderr = errors)
179 status = child.wait()
181 errors.seek(0)
182 error_messages = errors.read().strip()
183 errors.close()
185 if error_messages:
186 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
188 def _check_plain_stream(stream):
189 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
190 errors = tempfile.TemporaryFile()
192 status_r, status_w = os.pipe()
194 # Note: Should ideally close status_r in the child, but we want to support Windows too
195 child = subprocess.Popen(_gnupg_options + ['--decrypt',
196 # Not all versions support this:
197 #'--max-output', str(1024 * 1024),
198 '--batch',
199 '--status-fd', str(status_w)],
200 stdin = stream,
201 stdout = data,
202 stderr = errors)
204 os.close(status_w)
206 try:
207 sigs = _get_sigs_from_gpg_status_stream(os.fdopen(status_r), child, errors)
208 finally:
209 data.seek(0)
210 return (data, sigs)
212 def _check_xml_stream(stream):
213 xml_comment_start = '<!-- Base64 Signature'
215 data_to_check = stream.read()
217 last_comment = data_to_check.rfind('\n' + xml_comment_start)
218 if last_comment < 0:
219 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
220 last_comment += 1 # Include new-line in data
222 data = tempfile.TemporaryFile()
223 data.write(data_to_check[:last_comment])
224 data.flush()
225 os.lseek(data.fileno(), 0, 0)
227 errors = tempfile.TemporaryFile()
229 sig_lines = data_to_check[last_comment:].split('\n')
230 if sig_lines[0].strip() != xml_comment_start:
231 raise SafeException(_('Bad signature block: extra data on comment line'))
232 while sig_lines and not sig_lines[-1].strip():
233 del sig_lines[-1]
234 if sig_lines[-1].strip() != '-->':
235 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
236 sig_data = '\n'.join(sig_lines[1:-1])
238 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
239 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
240 try:
241 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
242 except Exception, ex:
243 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))
245 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
246 try:
247 sig_file = os.fdopen(sig_fd, 'w')
248 sig_file.write(sig_data)
249 sig_file.close()
251 # Note: Should ideally close status_r in the child, but we want to support Windows too
252 child = subprocess.Popen(_gnupg_options + [
253 # Not all versions support this:
254 #'--max-output', str(1024 * 1024),
255 '--batch',
256 # Windows GPG can only cope with "1" here
257 '--status-fd', '1',
258 '--verify', sig_name, '-'],
259 stdin = data,
260 stdout = subprocess.PIPE,
261 stderr = errors)
263 try:
264 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
265 finally:
266 os.lseek(stream.fileno(), 0, 0)
267 stream.seek(0)
268 finally:
269 os.unlink(sig_name)
270 return (stream, sigs)
272 def check_stream(stream):
273 """Pass stream through gpg --decrypt to get the data, the error text,
274 and a list of signatures (good or bad). If stream starts with "<?xml "
275 then get the signature from a comment at the end instead (and the returned
276 data is the original stream). stream must be seekable.
277 @note: Stream returned may or may not be the one passed in. Be careful!
278 @return: (data_stream, [Signatures])"""
279 if not find_in_path('gpg'):
280 raise SafeException(_("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org"))
282 #stream.seek(0)
283 #all = stream.read()
284 stream.seek(0)
286 start = stream.read(6)
287 stream.seek(0)
288 if start == "<?xml ":
289 return _check_xml_stream(stream)
290 elif start == '-----B':
291 import warnings
292 warnings.warn(_("Plain GPG-signed feeds are deprecated!"), DeprecationWarning, stacklevel = 2)
293 os.lseek(stream.fileno(), 0, 0)
294 return _check_plain_stream(stream)
295 else:
296 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
298 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
299 """Read messages from status_r and collect signatures from it.
300 When done, reap 'child'.
301 If there are no signatures, throw SafeException (using errors
302 for the error message if non-empty)."""
303 sigs = []
305 # Should we error out on bad signatures, even if there's a good
306 # signature too?
308 for line in status_r:
309 assert line.endswith('\n')
310 assert line.startswith('[GNUPG:] ')
311 line = line[9:-1]
312 split_line = line.split(' ')
313 code = split_line[0]
314 args = split_line[1:]
315 if code == 'VALIDSIG':
316 sigs.append(ValidSig(args))
317 elif code == 'BADSIG':
318 sigs.append(BadSig(args))
319 elif code == 'ERRSIG':
320 sigs.append(ErrSig(args))
322 status = child.wait()
324 errors.seek(0)
326 error_messages = errors.read().strip()
327 errors.close()
329 if not sigs:
330 if error_messages:
331 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
332 else:
333 raise SafeException(_("No signatures found. No error messages from GPG."))
335 return sigs