Re-added Policy.get_interface(), but with a deprecation warning.
[zeroinstall/zeroinstall-mseaborn.git] / zeroinstall / injector / gpg.py
blob2e7d6c07e769776d1b8a397555b588deac8e9e95
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 import base64, re
13 import os
14 import tempfile
15 import traceback
16 from logging import warn
17 from trust import trust_db
18 from model import SafeException
20 class Signature(object):
21 """Abstract base class for signature check results."""
22 status = None
24 def __init__(self, status):
25 self.status = status
27 def is_trusted(self, domain = None):
28 return False
30 def need_key(self):
31 """Returns the ID of the key that must be downloaded to check this signature."""
32 return None
34 class ValidSig(Signature):
35 """A valid signature check result."""
36 FINGERPRINT = 0
37 TIMESTAMP = 2
39 def __str__(self):
40 return "Valid signature from " + self.status[self.FINGERPRINT]
42 def is_trusted(self, domain = None):
43 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
45 def get_timestamp(self):
46 return int(self.status[self.TIMESTAMP])
48 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
50 def get_details(self):
51 cin, cout = os.popen2(('gpg', '--with-colons', '--no-secmem-warning', '--list-keys', self.fingerprint))
52 cin.close()
53 details = []
54 for line in cout:
55 details.append(line.split(':'))
56 cout.close()
57 return details
59 class BadSig(Signature):
60 """A bad signature (doesn't match the message)."""
61 KEYID = 0
63 def __str__(self):
64 return "BAD signature by " + self.status[self.KEYID] + \
65 " (the message has been tampered with)"
67 class ErrSig(Signature):
68 """Error while checking a signature."""
69 KEYID = 0
70 ALG = 1
71 RC = -1
73 def __str__(self):
74 msg = "ERROR signature by %s: " % self.status[self.KEYID]
75 rc = int(self.status[self.RC])
76 if rc == 4:
77 msg += "Unknown or unsupported algorithm '%s'" % self.status[self.ALG]
78 elif rc == 9:
79 msg += "Unknown key. Try 'gpg --recv-key %s'" % self.status[self.KEYID]
80 else:
81 msg += "Unknown reason code %d" % rc
82 return msg
84 def need_key(self):
85 rc = int(self.status[self.RC])
86 if rc == 9:
87 return self.status[self.KEYID]
88 return None
90 class Key:
91 """A GPG key.
92 @since: 0.27
93 """
94 def __init__(self, fingerprint):
95 self.fingerprint = fingerprint
96 self.name = '(unknown)'
98 def get_short_name(self):
99 return self.name.split(' (', 1)[0].split(' <', 1)[0]
101 def load_keys(fingerprints):
102 """Load a set of keys at once.
103 This is much more efficient than making individual calls to L{load_key}.
104 @return: a list of loaded keys, indexed by fingerprint
105 @rtype: {str: L{Key}}
106 @since 0.27"""
108 keys = {}
110 # Otherwise GnuPG returns everything...
111 if not fingerprints: return keys
113 for fp in fingerprints:
114 keys[fp] = Key(fp)
116 current_fpr = None
117 current_uid = None
119 cin, cout = os.popen2(['gpg', '--fixed-list-mode', '--with-colons', '--list-keys',
120 '--with-fingerprint', '--with-fingerprint'] + fingerprints)
121 cin.close()
122 try:
123 for line in cout:
124 if line.startswith('pub:'):
125 current_fpr = None
126 current_uid = None
127 if line.startswith('fpr:'):
128 current_fpr = line.split(':')[9]
129 if current_fpr in keys and current_uid:
130 # This is probably a subordinate key, where the fingerprint
131 # comes after the uid, not before. Note: we assume the subkey is
132 # cross-certified, as recent always ones are.
133 keys[current_fpr].name = current_uid
134 if line.startswith('uid:'):
135 assert current_fpr is not None
136 parts = line.split(':')
137 current_uid = parts[9]
138 if current_fpr in keys:
139 keys[current_fpr].name = current_uid
140 finally:
141 cout.close()
143 return keys
145 def load_key(fingerprint):
146 """Query gpg for information about this key.
147 @return: a new key
148 @rtype: L{Key}
149 @since: 0.27"""
150 return load_keys([fingerprint])[fingerprint]
152 def import_key(stream):
153 """Run C{gpg --import} with this stream as stdin."""
154 errors = tempfile.TemporaryFile()
156 child = os.fork()
157 if child == 0:
158 # We are the child
159 try:
160 try:
161 os.dup2(stream.fileno(), 0)
162 os.dup2(errors.fileno(), 2)
163 os.execlp('gpg', 'gpg', '--no-secmem-warning', '--quiet', '--import')
164 except:
165 traceback.print_exc()
166 finally:
167 os._exit(1)
168 assert False
170 pid, status = os.waitpid(child, 0)
171 assert pid == child
173 errors.seek(0)
174 error_messages = errors.read().strip()
175 errors.close()
177 if error_messages:
178 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages)
180 def _check_plain_stream(stream):
181 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
182 errors = tempfile.TemporaryFile()
184 status_r, status_w = os.pipe()
186 child = os.fork()
188 if child == 0:
189 # We are the child
190 try:
191 try:
192 os.close(status_r)
193 os.dup2(stream.fileno(), 0)
194 os.dup2(data.fileno(), 1)
195 os.dup2(errors.fileno(), 2)
196 os.execlp('gpg', 'gpg', '--no-secmem-warning', '--decrypt',
197 # Not all versions support this:
198 #'--max-output', str(1024 * 1024),
199 '--batch',
200 '--status-fd', str(status_w))
201 except:
202 traceback.print_exc()
203 finally:
204 os._exit(1)
205 assert False
207 # We are the parent
208 os.close(status_w)
210 try:
211 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
212 finally:
213 data.seek(0)
214 return (data, sigs)
216 def _check_xml_stream(stream):
217 xml_comment_start = '<!-- Base64 Signature'
219 data_to_check = stream.read()
221 last_comment = data_to_check.rfind('\n' + xml_comment_start)
222 if last_comment < 0:
223 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
224 last_comment += 1 # Include new-line in data
226 data = tempfile.TemporaryFile()
227 data.write(data_to_check[:last_comment])
228 data.flush()
229 os.lseek(data.fileno(), 0, 0)
231 errors = tempfile.TemporaryFile()
233 sig_lines = data_to_check[last_comment:].split('\n')
234 if sig_lines[0].strip() != xml_comment_start:
235 raise SafeException('Bad signature block: extra data on comment line')
236 while sig_lines and not sig_lines[-1].strip():
237 del sig_lines[-1]
238 if sig_lines[-1].strip() != '-->':
239 raise SafeException('Bad signature block: last line is not end-of-comment')
240 sig_data = '\n'.join(sig_lines[1:-1])
242 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
243 raise SafeException("Invalid characters found in base 64 encoded signature")
244 try:
245 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
246 except Exception, ex:
247 raise SafeException("Invalid base 64 encoded signature: " + str(ex))
249 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
250 try:
251 sig_file = os.fdopen(sig_fd, 'w')
252 sig_file.write(sig_data)
253 sig_file.close()
255 status_r, status_w = os.pipe()
257 child = os.fork()
259 if child == 0:
260 # We are the child
261 try:
262 try:
263 os.close(status_r)
264 os.dup2(data.fileno(), 0)
265 os.dup2(errors.fileno(), 2)
266 os.execlp('gpg', 'gpg', '--no-secmem-warning',
267 # Not all versions support this:
268 #'--max-output', str(1024 * 1024),
269 '--batch',
270 '--status-fd', str(status_w),
271 '--verify', sig_name, '-')
272 except:
273 traceback.print_exc()
274 finally:
275 os._exit(1)
276 assert False
278 # We are the parent
279 os.close(status_w)
281 try:
282 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
283 finally:
284 os.lseek(stream.fileno(), 0, 0)
285 stream.seek(0)
286 finally:
287 os.unlink(sig_name)
288 return (stream, sigs)
290 def _find_in_path(prog):
291 for d in os.environ['PATH'].split(':'):
292 path = os.path.join(d, prog)
293 if os.path.isfile(path):
294 return path
295 return None
297 def check_stream(stream):
298 """Pass stream through gpg --decrypt to get the data, the error text,
299 and a list of signatures (good or bad). If stream starts with "<?xml "
300 then get the signature from a comment at the end instead (and the returned
301 data is the original stream). stream must be seekable.
302 @note: Stream returned may or may not be the one passed in. Be careful!
303 @return: (data_stream, [Signatures])"""
304 if not _find_in_path('gpg'):
305 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
307 #stream.seek(0)
308 #all = stream.read()
309 stream.seek(0)
311 start = stream.read(6)
312 stream.seek(0)
313 if start == "<?xml ":
314 return _check_xml_stream(stream)
315 elif start == '-----B':
316 warn("Plain GPG-signed feeds are deprecated!")
317 os.lseek(stream.fileno(), 0, 0)
318 return _check_plain_stream(stream)
319 else:
320 raise SafeException("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s" % repr(stream.read(120)))
322 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
323 """Read messages from status_r and collect signatures from it.
324 When done, reap 'child'.
325 If there are no signatures, throw SafeException (using errors
326 for the error message if non-empty)."""
327 sigs = []
329 # Should we error out on bad signatures, even if there's a good
330 # signature too?
332 for line in os.fdopen(status_r):
333 assert line.endswith('\n')
334 assert line.startswith('[GNUPG:] ')
335 line = line[9:-1]
336 split_line = line.split(' ')
337 code = split_line[0]
338 args = split_line[1:]
339 if code == 'VALIDSIG':
340 sigs.append(ValidSig(args))
341 elif code == 'BADSIG':
342 sigs.append(BadSig(args))
343 elif code == 'ERRSIG':
344 sigs.append(ErrSig(args))
346 pid, status = os.waitpid(child, 0)
347 assert pid == child
349 errors.seek(0)
351 error_messages = errors.read().strip()
352 errors.close()
354 if not sigs:
355 if error_messages:
356 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages)
357 else:
358 raise SafeException("No signatures found. No error messages from GPG.")
360 return sigs