Added a feed for Zero Install itself.
[zeroinstall.git] / zeroinstall / injector / gpg.py
blobf61dd5f68f860716c95ad987eb919ff160fcae3d
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 import base64, re
13 import os
14 import tempfile
15 import traceback
16 from logging import warn
17 from trust import trust_db
18 from model import SafeException
20 class Signature(object):
21 """Abstract base class for signature check results."""
22 status = None
24 def __init__(self, status):
25 self.status = status
27 def is_trusted(self, domain = None):
28 return False
30 def need_key(self):
31 """Returns the ID of the key that must be downloaded to check this signature."""
32 return None
34 class ValidSig(Signature):
35 """A valid signature check result."""
36 FINGERPRINT = 0
37 TIMESTAMP = 2
39 def __str__(self):
40 return "Valid signature from " + self.status[self.FINGERPRINT]
42 def is_trusted(self, domain = None):
43 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
45 def get_timestamp(self):
46 return int(self.status[self.TIMESTAMP])
48 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
50 def get_details(self):
51 cin, cout = os.popen2(('gpg', '--with-colons', '--no-secmem-warning', '--list-keys', self.fingerprint))
52 cin.close()
53 details = []
54 for line in cout:
55 details.append(line.split(':'))
56 cout.close()
57 return details
59 class BadSig(Signature):
60 """A bad signature (doesn't match the message)."""
61 KEYID = 0
63 def __str__(self):
64 return "BAD signature by " + self.status[self.KEYID] + \
65 " (the message has been tampered with)"
67 class ErrSig(Signature):
68 """Error while checking a signature."""
69 KEYID = 0
70 ALG = 1
71 RC = -1
73 def __str__(self):
74 msg = "ERROR signature by %s: " % self.status[self.KEYID]
75 rc = int(self.status[self.RC])
76 if rc == 4:
77 msg += "Unknown or unsupported algorithm '%s'" % self.status[self.ALG]
78 elif rc == 9:
79 msg += "Unknown key. Try 'gpg --recv-key %s'" % self.status[self.KEYID]
80 else:
81 msg += "Unknown reason code %d" % rc
82 return msg
84 def need_key(self):
85 rc = int(self.status[self.RC])
86 if rc == 9:
87 return self.status[self.KEYID]
88 return None
90 class Key:
91 """A GPG key.
92 @since: 0.27
93 """
94 def __init__(self, fingerprint):
95 self.fingerprint = fingerprint
96 self.name = '(unknown)'
98 def get_short_name(self):
99 return self.name.split(' (', 1)[0].split(' <', 1)[0]
101 def load_keys(fingerprints):
102 """Load a set of keys at once.
103 This is much more efficient than making individual calls to L{load_key}.
104 @return: a list of loaded keys, indexed by fingerprint
105 @rtype: {str: L{Key}}
106 @since 0.27"""
108 keys = {}
110 # Otherwise GnuPG returns everything...
111 if not fingerprints: return keys
113 for fp in fingerprints:
114 keys[fp] = Key(fp)
116 current_fpr = None
118 cin, cout = os.popen2(['gpg', '--fixed-list-mode', '--with-colons', '--list-keys', '--with-fingerprint'] + fingerprints)
119 cin.close()
120 try:
121 for line in cout:
122 if line.startswith('pub:'):
123 current_fpr = None
124 if line.startswith('fpr:'):
125 assert current_fpr is None
126 current_fpr = line.split(':')[9]
127 if line.startswith('uid:'):
128 assert current_fpr is not None
129 parts = line.split(':')
130 if current_fpr in keys:
131 keys[current_fpr].name = parts[9]
132 else:
133 warn("Got information about key '%s', but I only asked about '%s'!", current_fpr, fingerprints)
134 finally:
135 cout.close()
137 return keys
139 def load_key(fingerprint):
140 """Query gpg for information about this key.
141 @return: a new key
142 @rtype: L{Key}
143 @since: 0.27"""
144 return load_keys([fingerprint])[fingerprint]
146 def import_key(stream):
147 """Run C{gpg --import} with this stream as stdin."""
148 errors = tempfile.TemporaryFile()
150 child = os.fork()
151 if child == 0:
152 # We are the child
153 try:
154 try:
155 os.dup2(stream.fileno(), 0)
156 os.dup2(errors.fileno(), 2)
157 os.execlp('gpg', 'gpg', '--no-secmem-warning', '--quiet', '--import')
158 except:
159 traceback.print_exc()
160 finally:
161 os._exit(1)
162 assert False
164 pid, status = os.waitpid(child, 0)
165 assert pid == child
167 errors.seek(0)
168 error_messages = errors.read().strip()
169 errors.close()
171 if error_messages:
172 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages)
174 def _check_plain_stream(stream):
175 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
176 errors = tempfile.TemporaryFile()
178 status_r, status_w = os.pipe()
180 child = os.fork()
182 if child == 0:
183 # We are the child
184 try:
185 try:
186 os.close(status_r)
187 os.dup2(stream.fileno(), 0)
188 os.dup2(data.fileno(), 1)
189 os.dup2(errors.fileno(), 2)
190 os.execlp('gpg', 'gpg', '--no-secmem-warning', '--decrypt',
191 # Not all versions support this:
192 #'--max-output', str(1024 * 1024),
193 '--batch',
194 '--status-fd', str(status_w))
195 except:
196 traceback.print_exc()
197 finally:
198 os._exit(1)
199 assert False
201 # We are the parent
202 os.close(status_w)
204 try:
205 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
206 finally:
207 data.seek(0)
208 return (data, sigs)
210 def _check_xml_stream(stream):
211 xml_comment_start = '<!-- Base64 Signature'
213 data_to_check = stream.read()
215 last_comment = data_to_check.rfind('\n' + xml_comment_start)
216 if last_comment < 0:
217 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
218 last_comment += 1 # Include new-line in data
220 data = tempfile.TemporaryFile()
221 data.write(data_to_check[:last_comment])
222 data.flush()
223 os.lseek(data.fileno(), 0, 0)
225 errors = tempfile.TemporaryFile()
227 sig_lines = data_to_check[last_comment:].split('\n')
228 if sig_lines[0].strip() != xml_comment_start:
229 raise SafeException('Bad signature block: extra data on comment line')
230 while sig_lines and not sig_lines[-1].strip():
231 del sig_lines[-1]
232 if sig_lines[-1].strip() != '-->':
233 raise SafeException('Bad signature block: last line is not end-of-comment')
234 sig_data = '\n'.join(sig_lines[1:-1])
236 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
237 raise SafeException("Invalid characters found in base 64 encoded signature")
238 try:
239 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
240 except Exception, ex:
241 raise SafeException("Invalid base 64 encoded signature: " + str(ex))
243 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
244 try:
245 sig_file = os.fdopen(sig_fd, 'w')
246 sig_file.write(sig_data)
247 sig_file.close()
249 status_r, status_w = os.pipe()
251 child = os.fork()
253 if child == 0:
254 # We are the child
255 try:
256 try:
257 os.close(status_r)
258 os.dup2(data.fileno(), 0)
259 os.dup2(errors.fileno(), 2)
260 os.execlp('gpg', 'gpg', '--no-secmem-warning',
261 # Not all versions support this:
262 #'--max-output', str(1024 * 1024),
263 '--batch',
264 '--status-fd', str(status_w),
265 '--verify', sig_name, '-')
266 except:
267 traceback.print_exc()
268 finally:
269 os._exit(1)
270 assert False
272 # We are the parent
273 os.close(status_w)
275 try:
276 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
277 finally:
278 os.lseek(stream.fileno(), 0, 0)
279 stream.seek(0)
280 finally:
281 os.unlink(sig_name)
282 return (stream, sigs)
284 def _find_in_path(prog):
285 for d in os.environ['PATH'].split(':'):
286 path = os.path.join(d, prog)
287 if os.path.isfile(path):
288 return path
289 return None
291 def check_stream(stream):
292 """Pass stream through gpg --decrypt to get the data, the error text,
293 and a list of signatures (good or bad). If stream starts with "<?xml "
294 then get the signature from a comment at the end instead (and the returned
295 data is the original stream). stream must be seekable.
296 @note: Stream returned may or may not be the one passed in. Be careful!
297 @return: (data_stream, [Signatures])"""
298 if not _find_in_path('gpg'):
299 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
301 #stream.seek(0)
302 #all = stream.read()
303 stream.seek(0)
305 start = stream.read(6)
306 stream.seek(0)
307 if start == "<?xml ":
308 return _check_xml_stream(stream)
309 else:
310 os.lseek(stream.fileno(), 0, 0)
311 return _check_plain_stream(stream)
313 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
314 """Read messages from status_r and collect signatures from it.
315 When done, reap 'child'.
316 If there are no signatures, throw SafeException (using errors
317 for the error message if non-empty)."""
318 sigs = []
320 # Should we error out on bad signatures, even if there's a good
321 # signature too?
323 for line in os.fdopen(status_r):
324 assert line.endswith('\n')
325 assert line.startswith('[GNUPG:] ')
326 line = line[9:-1]
327 split_line = line.split(' ')
328 code = split_line[0]
329 args = split_line[1:]
330 if code == 'VALIDSIG':
331 sigs.append(ValidSig(args))
332 elif code == 'BADSIG':
333 sigs.append(BadSig(args))
334 elif code == 'ERRSIG':
335 sigs.append(ErrSig(args))
337 pid, status = os.waitpid(child, 0)
338 assert pid == child
340 errors.seek(0)
342 error_messages = errors.read().strip()
343 errors.close()
345 if not sigs:
346 if error_messages:
347 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages)
348 else:
349 raise SafeException("No signatures found. No error messages from GPG.")
351 return sigs