Use subprocess instead of fork when running gpg.
[zeroinstall.git] / zeroinstall / injector / gpg.py
blob0342a75c554cd6cf178e89cb8de0fecb2325936f
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 import subprocess
13 import base64, re
14 import os
15 import tempfile
16 import traceback
17 from logging import info
19 from zeroinstall import support
20 from zeroinstall.injector.trust import trust_db
21 from zeroinstall.injector.model import SafeException
23 class Signature(object):
24 """Abstract base class for signature check results."""
25 status = None
27 def __init__(self, status):
28 self.status = status
30 def is_trusted(self, domain = None):
31 """Whether this signature is trusted by the user."""
32 return False
34 def need_key(self):
35 """Returns the ID of the key that must be downloaded to check this signature."""
36 return None
38 class ValidSig(Signature):
39 """A valid signature check result."""
40 FINGERPRINT = 0
41 TIMESTAMP = 2
43 def __str__(self):
44 return "Valid signature from " + self.status[self.FINGERPRINT]
46 def is_trusted(self, domain = None):
47 """Asks the L{trust.trust_db}."""
48 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
50 def get_timestamp(self):
51 """Get the time this signature was made."""
52 return int(self.status[self.TIMESTAMP])
54 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
56 def get_details(self):
57 """Call 'gpg --list-keys' and return the results split into lines and columns.
58 @rtype: [[str]]"""
59 child = subprocess.Popen(['gpg', '--with-colons', '--no-secmem-warning', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
60 cout, unused = child.communicate()
61 if child.returncode:
62 info("GPG exited with code %d" % child.returncode)
63 details = []
64 for line in cout.split('\n'):
65 details.append(line.split(':'))
66 return details
68 class BadSig(Signature):
69 """A bad signature (doesn't match the message)."""
70 KEYID = 0
72 def __str__(self):
73 return "BAD signature by " + self.status[self.KEYID] + \
74 " (the message has been tampered with)"
76 class ErrSig(Signature):
77 """Error while checking a signature."""
78 KEYID = 0
79 ALG = 1
80 RC = -1
82 def __str__(self):
83 msg = "ERROR signature by %s: " % self.status[self.KEYID]
84 rc = int(self.status[self.RC])
85 if rc == 4:
86 msg += "Unknown or unsupported algorithm '%s'" % self.status[self.ALG]
87 elif rc == 9:
88 msg += "Unknown key. Try 'gpg --recv-key %s'" % self.status[self.KEYID]
89 else:
90 msg += "Unknown reason code %d" % rc
91 return msg
93 def need_key(self):
94 rc = int(self.status[self.RC])
95 if rc == 9:
96 return self.status[self.KEYID]
97 return None
99 class Key:
100 """A GPG key.
101 @since: 0.27
102 @param fingerprint: the fingerprint of the key
103 @type fingerprint: str
104 @ivar name: a short name for the key, extracted from the full name
105 @type name: str
107 def __init__(self, fingerprint):
108 self.fingerprint = fingerprint
109 self.name = '(unknown)'
111 def get_short_name(self):
112 return self.name.split(' (', 1)[0].split(' <', 1)[0]
114 def load_keys(fingerprints):
115 """Load a set of keys at once.
116 This is much more efficient than making individual calls to L{load_key}.
117 @return: a list of loaded keys, indexed by fingerprint
118 @rtype: {str: L{Key}}
119 @since: 0.27"""
121 keys = {}
123 # Otherwise GnuPG returns everything...
124 if not fingerprints: return keys
126 for fp in fingerprints:
127 keys[fp] = Key(fp)
129 current_fpr = None
130 current_uid = None
132 cin, cout = os.popen2(['gpg', '--fixed-list-mode', '--with-colons', '--list-keys',
133 '--with-fingerprint', '--with-fingerprint'] + fingerprints)
134 cin.close()
135 try:
136 for line in cout:
137 if line.startswith('pub:'):
138 current_fpr = None
139 current_uid = None
140 if line.startswith('fpr:'):
141 current_fpr = line.split(':')[9]
142 if current_fpr in keys and current_uid:
143 # This is probably a subordinate key, where the fingerprint
144 # comes after the uid, not before. Note: we assume the subkey is
145 # cross-certified, as recent always ones are.
146 keys[current_fpr].name = current_uid
147 if line.startswith('uid:'):
148 assert current_fpr is not None
149 parts = line.split(':')
150 current_uid = parts[9]
151 if current_fpr in keys:
152 keys[current_fpr].name = current_uid
153 finally:
154 cout.close()
156 return keys
158 def load_key(fingerprint):
159 """Query gpg for information about this key.
160 @return: a new key
161 @rtype: L{Key}
162 @since: 0.27"""
163 return load_keys([fingerprint])[fingerprint]
165 def import_key(stream):
166 """Run C{gpg --import} with this stream as stdin."""
167 errors = tempfile.TemporaryFile()
169 child = subprocess.Popen(['gpg', '--no-secmem-warning', '--quiet', '--import'],
170 stdin = stream, stderr = errors)
172 status = child.wait()
174 errors.seek(0)
175 error_messages = errors.read().strip()
176 errors.close()
178 if error_messages:
179 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages)
181 def _check_plain_stream(stream):
182 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
183 errors = tempfile.TemporaryFile()
185 status_r, status_w = os.pipe()
187 # Note: Should ideally close status_r in the child, but we want to support Windows too
188 child = subprocess.Popen(['gpg', '--no-secmem-warning', '--decrypt',
189 # Not all versions support this:
190 #'--max-output', str(1024 * 1024),
191 '--batch',
192 '--status-fd', str(status_w)],
193 stdin = stream,
194 stdout = data,
195 stderr = errors)
197 os.close(status_w)
199 try:
200 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
201 finally:
202 data.seek(0)
203 return (data, sigs)
205 def _check_xml_stream(stream):
206 xml_comment_start = '<!-- Base64 Signature'
208 data_to_check = stream.read()
210 last_comment = data_to_check.rfind('\n' + xml_comment_start)
211 if last_comment < 0:
212 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
213 last_comment += 1 # Include new-line in data
215 data = tempfile.TemporaryFile()
216 data.write(data_to_check[:last_comment])
217 data.flush()
218 os.lseek(data.fileno(), 0, 0)
220 errors = tempfile.TemporaryFile()
222 sig_lines = data_to_check[last_comment:].split('\n')
223 if sig_lines[0].strip() != xml_comment_start:
224 raise SafeException('Bad signature block: extra data on comment line')
225 while sig_lines and not sig_lines[-1].strip():
226 del sig_lines[-1]
227 if sig_lines[-1].strip() != '-->':
228 raise SafeException('Bad signature block: last line is not end-of-comment')
229 sig_data = '\n'.join(sig_lines[1:-1])
231 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
232 raise SafeException("Invalid characters found in base 64 encoded signature")
233 try:
234 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
235 except Exception, ex:
236 raise SafeException("Invalid base 64 encoded signature: " + str(ex))
238 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
239 try:
240 sig_file = os.fdopen(sig_fd, 'w')
241 sig_file.write(sig_data)
242 sig_file.close()
244 status_r, status_w = os.pipe()
246 # Note: Should ideally close status_r in the child, but we want to support Windows too
247 child = subprocess.Popen(['gpg', '--no-secmem-warning',
248 # Not all versions support this:
249 #'--max-output', str(1024 * 1024),
250 '--batch',
251 '--status-fd', str(status_w),
252 '--verify', sig_name, '-'],
253 stdin = data,
254 stderr = errors)
256 os.close(status_w)
258 try:
259 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
260 finally:
261 os.lseek(stream.fileno(), 0, 0)
262 stream.seek(0)
263 finally:
264 os.unlink(sig_name)
265 return (stream, sigs)
267 def check_stream(stream):
268 """Pass stream through gpg --decrypt to get the data, the error text,
269 and a list of signatures (good or bad). If stream starts with "<?xml "
270 then get the signature from a comment at the end instead (and the returned
271 data is the original stream). stream must be seekable.
272 @note: Stream returned may or may not be the one passed in. Be careful!
273 @return: (data_stream, [Signatures])"""
274 if not support.find_in_path('gpg'):
275 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
277 #stream.seek(0)
278 #all = stream.read()
279 stream.seek(0)
281 start = stream.read(6)
282 stream.seek(0)
283 if start == "<?xml ":
284 return _check_xml_stream(stream)
285 elif start == '-----B':
286 import warnings
287 warnings.warn("Plain GPG-signed feeds are deprecated!", DeprecationWarning, stacklevel = 2)
288 os.lseek(stream.fileno(), 0, 0)
289 return _check_plain_stream(stream)
290 else:
291 raise SafeException("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s" % repr(stream.read(120)))
293 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
294 """Read messages from status_r and collect signatures from it.
295 When done, reap 'child'.
296 If there are no signatures, throw SafeException (using errors
297 for the error message if non-empty)."""
298 sigs = []
300 # Should we error out on bad signatures, even if there's a good
301 # signature too?
303 for line in os.fdopen(status_r):
304 assert line.endswith('\n')
305 assert line.startswith('[GNUPG:] ')
306 line = line[9:-1]
307 split_line = line.split(' ')
308 code = split_line[0]
309 args = split_line[1:]
310 if code == 'VALIDSIG':
311 sigs.append(ValidSig(args))
312 elif code == 'BADSIG':
313 sigs.append(BadSig(args))
314 elif code == 'ERRSIG':
315 sigs.append(ErrSig(args))
317 status = child.wait()
319 errors.seek(0)
321 error_messages = errors.read().strip()
322 errors.close()
324 if not sigs:
325 if error_messages:
326 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages)
327 else:
328 raise SafeException("No signatures found. No error messages from GPG.")
330 return sigs