Added load_keys() as a faster alternative to load_key().
[zeroinstall.git] / zeroinstall / injector / gpg.py
blob18aa21e95856420a11fed10b38c89cfddbceb32f
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 import base64, re
13 import os
14 import tempfile
15 import traceback
16 from trust import trust_db
17 from model import SafeException
19 class Signature:
20 """Abstract base class for signature check results."""
21 status = None
23 def __init__(self, status):
24 self.status = status
26 def is_trusted(self, domain = None):
27 return False
29 def need_key(self):
30 """Returns the ID of the key that must be downloaded to check this signature."""
31 return None
33 class ValidSig(Signature):
34 """A valid signature check result."""
35 FINGERPRINT = 0
36 TIMESTAMP = 2
38 def __str__(self):
39 return "Valid signature from " + self.status[self.FINGERPRINT]
41 def is_trusted(self, domain = None):
42 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
44 def get_timestamp(self):
45 return int(self.status[self.TIMESTAMP])
47 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
49 def get_details(self):
50 cin, cout = os.popen2(('gpg', '--with-colons', '--no-secmem-warning', '--list-keys', self.fingerprint))
51 cin.close()
52 details = []
53 for line in cout:
54 details.append(line.split(':'))
55 cout.close()
56 return details
58 class BadSig(Signature):
59 """A bad signature (doesn't match the message)."""
60 KEYID = 0
62 def __str__(self):
63 return "BAD signature by " + self.status[self.KEYID] + \
64 " (the message has been tampered with)"
66 class ErrSig(Signature):
67 """Error while checking a signature."""
68 KEYID = 0
69 ALG = 1
70 RC = -1
72 def __str__(self):
73 msg = "ERROR signature by %s: " % self.status[self.KEYID]
74 rc = int(self.status[self.RC])
75 if rc == 4:
76 msg += "Unknown or unsupported algorithm '%s'" % self.status[self.ALG]
77 elif rc == 9:
78 msg += "Unknown key. Try 'gpg --recv-key %s'" % self.status[self.KEYID]
79 else:
80 msg += "Unknown reason code %d" % rc
81 return msg
83 def need_key(self):
84 rc = int(self.status[self.RC])
85 if rc == 9:
86 return self.status[self.KEYID]
87 return None
89 class Key:
90 """A GPG key.
91 @since: 0.27
92 """
93 def __init__(self, fingerprint):
94 self.fingerprint = fingerprint
95 self.name = '(unknown)'
97 def get_short_name(self):
98 return self.name.split(' (', 1)[0].split(' <', 1)[0]
100 def load_keys(fingerprints):
101 """Load a set of keys at once.
102 This is much more efficient than making individual calls to L{load_key}.
103 @return: a list of loaded keys, indexed by fingerprint
104 @rtype: {str: L{Key}}
105 @since 0.27"""
107 keys = {}
108 for fp in fingerprints:
109 keys[fp] = Key(fp)
111 current_fpr = None
113 cin, cout = os.popen2(['gpg', '--fixed-list-mode', '--with-colons', '--list-keys', '--with-fingerprint'] + fingerprints)
114 cin.close()
115 try:
116 for line in cout:
117 if line.startswith('pub:'):
118 current_fpr = None
119 if line.startswith('fpr:'):
120 assert current_fpr is None
121 current_fpr = line.split(':')[9]
122 if line.startswith('uid:'):
123 assert current_fpr is not None
124 parts = line.split(':')
125 keys[current_fpr].name = parts[9]
126 finally:
127 cout.close()
129 return keys
131 def load_key(fingerprint):
132 """Query gpg for information about this key.
133 @return: a new key
134 @rtype: L{Key}
135 @since: 0.27"""
136 return load_keys([fingerprint])[fingerprint]
138 def import_key(stream):
139 """Run C{gpg --import} with this stream as stdin."""
140 errors = tempfile.TemporaryFile()
142 child = os.fork()
143 if child == 0:
144 # We are the child
145 try:
146 try:
147 os.dup2(stream.fileno(), 0)
148 os.dup2(errors.fileno(), 2)
149 os.execlp('gpg', 'gpg', '--no-secmem-warning', '--quiet', '--import')
150 except:
151 traceback.print_exc()
152 finally:
153 os._exit(1)
154 assert False
156 pid, status = os.waitpid(child, 0)
157 assert pid == child
159 errors.seek(0)
160 error_messages = errors.read().strip()
161 errors.close()
163 if error_messages:
164 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages)
166 def _check_plain_stream(stream):
167 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
168 errors = tempfile.TemporaryFile()
170 status_r, status_w = os.pipe()
172 child = os.fork()
174 if child == 0:
175 # We are the child
176 try:
177 try:
178 os.close(status_r)
179 os.dup2(stream.fileno(), 0)
180 os.dup2(data.fileno(), 1)
181 os.dup2(errors.fileno(), 2)
182 os.execlp('gpg', 'gpg', '--no-secmem-warning', '--decrypt',
183 # Not all versions support this:
184 #'--max-output', str(1024 * 1024),
185 '--batch',
186 '--status-fd', str(status_w))
187 except:
188 traceback.print_exc()
189 finally:
190 os._exit(1)
191 assert False
193 # We are the parent
194 os.close(status_w)
196 try:
197 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
198 finally:
199 data.seek(0)
200 return (data, sigs)
202 def _check_xml_stream(stream):
203 xml_comment_start = '<!-- Base64 Signature'
205 data_to_check = stream.read()
207 last_comment = data_to_check.rfind('\n' + xml_comment_start)
208 if last_comment < 0:
209 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
210 last_comment += 1 # Include new-line in data
212 data = tempfile.TemporaryFile()
213 data.write(data_to_check[:last_comment])
214 data.flush()
215 os.lseek(data.fileno(), 0, 0)
217 errors = tempfile.TemporaryFile()
219 sig_lines = data_to_check[last_comment:].split('\n')
220 if sig_lines[0].strip() != xml_comment_start:
221 raise SafeException('Bad signature block: extra data on comment line')
222 while sig_lines and not sig_lines[-1].strip():
223 del sig_lines[-1]
224 if sig_lines[-1].strip() != '-->':
225 raise SafeException('Bad signature block: last line is not end-of-comment')
226 sig_data = '\n'.join(sig_lines[1:-1])
228 if re.match('^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
229 raise SafeException("Invalid characters found in base 64 encoded signature")
230 try:
231 sig_data = base64.decodestring(sig_data) # (b64decode is Python 2.4)
232 except Exception, ex:
233 raise SafeException("Invalid base 64 encoded signature: " + str(ex))
235 sig_fd, sig_name = tempfile.mkstemp(prefix = 'injector-sig-')
236 try:
237 sig_file = os.fdopen(sig_fd, 'w')
238 sig_file.write(sig_data)
239 sig_file.close()
241 status_r, status_w = os.pipe()
243 child = os.fork()
245 if child == 0:
246 # We are the child
247 try:
248 try:
249 os.close(status_r)
250 os.dup2(data.fileno(), 0)
251 os.dup2(errors.fileno(), 2)
252 os.execlp('gpg', 'gpg', '--no-secmem-warning',
253 # Not all versions support this:
254 #'--max-output', str(1024 * 1024),
255 '--batch',
256 '--status-fd', str(status_w),
257 '--verify', sig_name, '-')
258 except:
259 traceback.print_exc()
260 finally:
261 os._exit(1)
262 assert False
264 # We are the parent
265 os.close(status_w)
267 try:
268 sigs = _get_sigs_from_gpg_status_stream(status_r, child, errors)
269 finally:
270 os.lseek(stream.fileno(), 0, 0)
271 stream.seek(0)
272 finally:
273 os.unlink(sig_name)
274 return (stream, sigs)
276 def _find_in_path(prog):
277 for d in os.environ['PATH'].split(':'):
278 path = os.path.join(d, prog)
279 if os.path.isfile(path):
280 return path
281 return None
283 def check_stream(stream):
284 """Pass stream through gpg --decrypt to get the data, the error text,
285 and a list of signatures (good or bad). If stream starts with "<?xml "
286 then get the signature from a comment at the end instead (and the returned
287 data is the original stream). stream must be seekable.
288 @note: Stream returned may or may not be the one passed in. Be careful!
289 @return: (data_stream, [Signatures])"""
290 if not _find_in_path('gpg'):
291 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
293 stream.seek(0)
294 all = stream.read()
295 stream.seek(0)
297 start = stream.read(6)
298 stream.seek(0)
299 if start == "<?xml ":
300 return _check_xml_stream(stream)
301 else:
302 os.lseek(stream.fileno(), 0, 0)
303 return _check_plain_stream(stream)
305 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
306 """Read messages from status_r and collect signatures from it.
307 When done, reap 'child'.
308 If there are no signatures, throw SafeException (using errors
309 for the error message if non-empty)."""
310 sigs = []
312 # Should we error out on bad signatures, even if there's a good
313 # signature too?
315 for line in os.fdopen(status_r):
316 assert line.endswith('\n')
317 assert line.startswith('[GNUPG:] ')
318 line = line[9:-1]
319 split_line = line.split(' ')
320 code = split_line[0]
321 args = split_line[1:]
322 if code == 'VALIDSIG':
323 sigs.append(ValidSig(args))
324 elif code == 'BADSIG':
325 sigs.append(BadSig(args))
326 elif code == 'ERRSIG':
327 sigs.append(ErrSig(args))
329 pid, status = os.waitpid(child, 0)
330 assert pid == child
332 errors.seek(0)
334 error_messages = errors.read().strip()
335 errors.close()
337 if not sigs:
338 if error_messages:
339 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages)
340 else:
341 raise SafeException("No signatures found. No error messages from GPG.")
343 return sigs