Update year to 2009 in various places
[zeroinstall/zeroinstall-rsl.git] / zeroinstall / injector / gpg.py
blob772959eafc62d22802c6a01ba940099704ff73fe
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 import subprocess
13 import base64, re
14 import os
15 import tempfile
16 from logging import info
18 from zeroinstall.support import find_in_path, basedir
19 from zeroinstall.injector.trust import trust_db
20 from zeroinstall.injector.model import SafeException
22 _gnupg_options = ['gpg', '--no-secmem-warning']
23 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ:
24 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')]
25 info("Running as root, so setting GnuPG home to %s", _gnupg_options[-1])
27 class Signature(object):
28 """Abstract base class for signature check results."""
29 status = None
31 def __init__(self, status):
32 self.status = status
34 def is_trusted(self, domain = None):
35 """Whether this signature is trusted by the user."""
36 return False
38 def need_key(self):
39 """Returns the ID of the key that must be downloaded to check this signature."""
40 return None
42 class ValidSig(Signature):
43 """A valid signature check result."""
44 FINGERPRINT = 0
45 TIMESTAMP = 2
47 def __str__(self):
48 return "Valid signature from " + self.status[self.FINGERPRINT]
50 def is_trusted(self, domain = None):
51 """Asks the L{trust.trust_db}."""
52 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
54 def get_timestamp(self):
55 """Get the time this signature was made."""
56 return int(self.status[self.TIMESTAMP])
58 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
60 def get_details(self):
61 """Call 'gpg --list-keys' and return the results split into lines and columns.
62 @rtype: [[str]]"""
63 child = subprocess.Popen(_gnupg_options + ['--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
64 cout, unused = child.communicate()
65 if child.returncode:
66 info("GPG exited with code %d" % child.returncode)
67 details = []
68 for line in cout.split('\n'):
69 details.append(line.split(':'))
70 return details
72 class BadSig(Signature):
73 """A bad signature (doesn't match the message)."""
74 KEYID = 0
76 def __str__(self):
77 return "BAD signature by " + self.status[self.KEYID] + \
78 " (the message has been tampered with)"
80 class ErrSig(Signature):
81 """Error while checking a signature."""
82 KEYID = 0
83 ALG = 1
84 RC = -1
86 def __str__(self):
87 msg = "ERROR signature by %s: " % self.status[self.KEYID]
88 rc = int(self.status[self.RC])
89 if rc == 4:
90 msg += "Unknown or unsupported algorithm '%s'" % self.status[self.ALG]
91 elif rc == 9:
92 msg += "Unknown key. Try 'gpg --recv-key %s'" % self.status[self.KEYID]
93 else:
94 msg += "Unknown reason code %d" % rc
95 return msg
97 def need_key(self):
98 rc = int(self.status[self.RC])
99 if rc == 9:
100 return self.status[self.KEYID]
101 return None
103 class Key:
104 """A GPG key.
105 @since: 0.27
106 @param fingerprint: the fingerprint of the key
107 @type fingerprint: str
108 @ivar name: a short name for the key, extracted from the full name
109 @type name: str
111 def __init__(self, fingerprint):
112 self.fingerprint = fingerprint
113 self.name = '(unknown)'
115 def get_short_name(self):
116 return self.name.split(' (', 1)[0].split(' <', 1)[0]
118 def load_keys(fingerprints):
119 """Load a set of keys at once.
120 This is much more efficient than making individual calls to L{load_key}.
121 @return: a list of loaded keys, indexed by fingerprint
122 @rtype: {str: L{Key}}
123 @since: 0.27"""
125 keys = {}
127 # Otherwise GnuPG returns everything...
128 if not fingerprints: return keys
130 for fp in fingerprints:
131 keys[fp] = Key(fp)
133 current_fpr = None
134 current_uid = None
136 cin, cout = os.popen2(_gnupg_options + ['--fixed-list-mode', '--with-colons', '--list-keys',
137 '--with-fingerprint', '--with-fingerprint'] + fingerprints)
138 cin.close()
139 try:
140 for line in cout:
141 if line.startswith('pub:'):
142 current_fpr = None
143 current_uid = None
144 if line.startswith('fpr:'):
145 current_fpr = line.split(':')[9]
146 if current_fpr in keys and current_uid:
147 # This is probably a subordinate key, where the fingerprint
148 # comes after the uid, not before. Note: we assume the subkey is
149 # cross-certified, as recent always ones are.
150 keys[current_fpr].name = current_uid
151 if line.startswith('uid:'):
152 assert current_fpr is not None
153 # Only take primary UID
154 if current_uid: continue
155 parts = line.split(':')
156 current_uid = parts[9]
157 if current_fpr in keys:
158 keys[current_fpr].name = current_uid
159 finally:
160 cout.close()
162 return keys
164 def load_key(fingerprint):
165 """Query gpg for information about this key.
166 @return: a new key
167 @rtype: L{Key}
168 @since: 0.27"""
169 return load_keys([fingerprint])[fingerprint]
171 def import_key(stream):
172 """Run C{gpg --import} with this stream as stdin."""
173 errors = tempfile.TemporaryFile()
175 child = subprocess.Popen(_gnupg_options + ['--quiet', '--import'],
176 stdin = stream, stderr = errors)
178 status = child.wait()
180 errors.seek(0)
181 error_messages = errors.read().strip()
182 errors.close()
184 if error_messages:
185 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages)
187 def _check_plain_stream(stream):
188 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
189 errors = tempfile.TemporaryFile()
191 status_r, status_w = os.pipe()
193 # Note: Should ideally close status_r in the child, but we want to support Windows too
194 child = subprocess.Popen(_gnupg_options + ['--decrypt',
195 # Not all versions support this:
196 #'--max-output', str(1024 * 1024),
197 '--batch',
198 '--status-fd', str(status_w)],
199 stdin = stream,
200 stdout = data,
201 stderr = errors)
203 os.close(status_w)
205 try:
206 sigs = _get_sigs_from_gpg_status_stream(os.fdopen(status_r), child, errors)
207 finally:
208 data.seek(0)
209 return (data, sigs)
211 def _check_xml_stream(stream):
212 xml_comment_start = '<!-- Base64 Signature'
214 data_to_check = stream.read()
216 last_comment = data_to_check.rfind('\n' + xml_comment_start)
217 if last_comment < 0:
218 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
219 last_comment += 1 # Include new-line in data
221 data = tempfile.TemporaryFile()
222 data.write(data_to_check[:last_comment])
223 data.flush()
224 os.lseek(data.fileno(), 0, 0)
226 errors = tempfile.TemporaryFile()
228 sig_lines = data_to_check[last_comment:].split('\n')
229 if sig_lines[0].strip() != xml_comment_start:
230 raise SafeException('Bad signature block: extra data on comment line')
231 while sig_lines and not sig_lines[-1].strip():
232 del sig_lines[-1]
233 if sig_lines[-1].strip() != '-->':
234 raise SafeException('Bad signature block: last line is not end-of-comment')
235 sig_data = '\n'.join(sig_lines[1:-1])
237 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
238 raise SafeException("Invalid characters found in base 64 encoded signature")
239 try:
240 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
241 except Exception, ex:
242 raise SafeException("Invalid base 64 encoded signature: " + str(ex))
244 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
245 try:
246 sig_file = os.fdopen(sig_fd, 'w')
247 sig_file.write(sig_data)
248 sig_file.close()
250 # Note: Should ideally close status_r in the child, but we want to support Windows too
251 child = subprocess.Popen(_gnupg_options + [
252 # Not all versions support this:
253 #'--max-output', str(1024 * 1024),
254 '--batch',
255 # Windows GPG can only cope with "1" here
256 '--status-fd', '1',
257 '--verify', sig_name, '-'],
258 stdin = data,
259 stdout = subprocess.PIPE,
260 stderr = errors)
262 try:
263 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
264 finally:
265 os.lseek(stream.fileno(), 0, 0)
266 stream.seek(0)
267 finally:
268 os.unlink(sig_name)
269 return (stream, sigs)
271 def check_stream(stream):
272 """Pass stream through gpg --decrypt to get the data, the error text,
273 and a list of signatures (good or bad). If stream starts with "<?xml "
274 then get the signature from a comment at the end instead (and the returned
275 data is the original stream). stream must be seekable.
276 @note: Stream returned may or may not be the one passed in. Be careful!
277 @return: (data_stream, [Signatures])"""
278 if not find_in_path('gpg'):
279 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
281 #stream.seek(0)
282 #all = stream.read()
283 stream.seek(0)
285 start = stream.read(6)
286 stream.seek(0)
287 if start == "<?xml ":
288 return _check_xml_stream(stream)
289 elif start == '-----B':
290 import warnings
291 warnings.warn("Plain GPG-signed feeds are deprecated!", DeprecationWarning, stacklevel = 2)
292 os.lseek(stream.fileno(), 0, 0)
293 return _check_plain_stream(stream)
294 else:
295 raise SafeException("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s" % repr(stream.read(120)))
297 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
298 """Read messages from status_r and collect signatures from it.
299 When done, reap 'child'.
300 If there are no signatures, throw SafeException (using errors
301 for the error message if non-empty)."""
302 sigs = []
304 # Should we error out on bad signatures, even if there's a good
305 # signature too?
307 for line in status_r:
308 assert line.endswith('\n')
309 assert line.startswith('[GNUPG:] ')
310 line = line[9:-1]
311 split_line = line.split(' ')
312 code = split_line[0]
313 args = split_line[1:]
314 if code == 'VALIDSIG':
315 sigs.append(ValidSig(args))
316 elif code == 'BADSIG':
317 sigs.append(BadSig(args))
318 elif code == 'ERRSIG':
319 sigs.append(ErrSig(args))
321 status = child.wait()
323 errors.seek(0)
325 error_messages = errors.read().strip()
326 errors.close()
328 if not sigs:
329 if error_messages:
330 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages)
331 else:
332 raise SafeException("No signatures found. No error messages from GPG.")
334 return sigs