More Python 3 support
[zeroinstall/solver.git] / zeroinstall / injector / gpg.py
blobef8388688187847514a079fd9a64dd7e16336505
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 from zeroinstall import _
13 import subprocess
14 import base64, re
15 import os
16 import tempfile
17 from logging import info, warn
19 from zeroinstall.support import find_in_path, basedir
20 from zeroinstall.injector.trust import trust_db
21 from zeroinstall.injector.model import SafeException
23 _gnupg_options = None
24 def _run_gpg(args, **kwargs):
25 global _gnupg_options
26 if _gnupg_options is None:
27 gpg_path = os.environ.get('ZEROINSTALL_GPG') or find_in_path('gpg') or find_in_path('gpg2') or 'gpg'
28 _gnupg_options = [gpg_path, '--no-secmem-warning']
30 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ:
31 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')]
32 info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1])
34 return subprocess.Popen(_gnupg_options + args, universal_newlines = True, **kwargs)
36 class Signature(object):
37 """Abstract base class for signature check results.
38 @ivar status: the raw data returned by GPG
39 @ivar messages: any messages printed by GPG which may be relevant to this signature
40 """
41 status = None
42 messages = None
44 def __init__(self, status):
45 self.status = status
47 def is_trusted(self, domain = None):
48 """Whether this signature is trusted by the user."""
49 return False
51 def need_key(self):
52 """Returns the ID of the key that must be downloaded to check this signature."""
53 return None
55 class ValidSig(Signature):
56 """A valid signature check result."""
57 FINGERPRINT = 0
58 TIMESTAMP = 2
60 def __str__(self):
61 return "Valid signature from " + self.status[self.FINGERPRINT]
63 def is_trusted(self, domain = None):
64 """Asks the L{trust.trust_db}."""
65 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
67 def get_timestamp(self):
68 """Get the time this signature was made."""
69 return int(self.status[self.TIMESTAMP])
71 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
73 def get_details(self):
74 """Call 'gpg --list-keys' and return the results split into lines and columns.
75 @rtype: [[str]]"""
76 # Note: GnuPG 2 always uses --fixed-list-mode
77 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
78 cout, unused = child.communicate()
79 if child.returncode:
80 info(_("GPG exited with code %d") % child.returncode)
81 details = []
82 for line in cout.split('\n'):
83 details.append(line.split(':'))
84 return details
86 class BadSig(Signature):
87 """A bad signature (doesn't match the message)."""
88 KEYID = 0
90 def __str__(self):
91 return _("BAD signature by %s (the message has been tampered with)") \
92 % self.status[self.KEYID]
94 class ErrSig(Signature):
95 """Error while checking a signature."""
96 KEYID = 0
97 ALG = 1
98 RC = -1
100 def __str__(self):
101 msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
102 rc = int(self.status[self.RC])
103 if rc == 4:
104 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
105 elif rc == 9:
106 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
107 else:
108 msg += _("Unknown reason code %d") % rc
109 return msg
111 def need_key(self):
112 rc = int(self.status[self.RC])
113 if rc == 9:
114 return self.status[self.KEYID]
115 return None
117 class Key:
118 """A GPG key.
119 @since: 0.27
120 @param fingerprint: the fingerprint of the key
121 @type fingerprint: str
122 @ivar name: a short name for the key, extracted from the full name
123 @type name: str
125 def __init__(self, fingerprint):
126 self.fingerprint = fingerprint
127 self.name = '(unknown)'
129 def get_short_name(self):
130 return self.name.split(' (', 1)[0].split(' <', 1)[0]
132 def load_keys(fingerprints):
133 """Load a set of keys at once.
134 This is much more efficient than making individual calls to L{load_key}.
135 @return: a list of loaded keys, indexed by fingerprint
136 @rtype: {str: L{Key}}
137 @since: 0.27"""
138 import codecs
140 keys = {}
142 # Otherwise GnuPG returns everything...
143 if not fingerprints: return keys
145 for fp in fingerprints:
146 keys[fp] = Key(fp)
148 current_fpr = None
149 current_uid = None
151 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
152 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
153 try:
154 for line in child.stdout:
155 if line.startswith('pub:'):
156 current_fpr = None
157 current_uid = None
158 if line.startswith('fpr:'):
159 current_fpr = line.split(':')[9]
160 if current_fpr in keys and current_uid:
161 # This is probably a subordinate key, where the fingerprint
162 # comes after the uid, not before. Note: we assume the subkey is
163 # cross-certified, as recent always ones are.
164 try:
165 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8')
166 except:
167 warn("Not UTF-8: %s", current_uid)
168 keys[current_fpr].name = current_uid
169 if line.startswith('uid:'):
170 assert current_fpr is not None
171 # Only take primary UID
172 if current_uid: continue
173 parts = line.split(':')
174 current_uid = parts[9]
175 if current_fpr in keys:
176 keys[current_fpr].name = current_uid
177 finally:
178 child.stdout.close()
180 if child.wait():
181 warn(_("gpg --list-keys failed with exit code %d") % child.returncode)
183 return keys
185 def load_key(fingerprint):
186 """Query gpg for information about this key.
187 @return: a new key
188 @rtype: L{Key}
189 @since: 0.27"""
190 return load_keys([fingerprint])[fingerprint]
192 def import_key(stream):
193 """Run C{gpg --import} with this stream as stdin."""
194 with tempfile.TemporaryFile(mode = 'w+t') as errors:
195 child = _run_gpg(['--quiet', '--import', '--batch'],
196 stdin = stream, stderr = errors)
198 status = child.wait()
200 errors.seek(0)
201 error_messages = errors.read().strip()
203 if status != 0:
204 if error_messages:
205 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
206 else:
207 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status)
208 elif error_messages:
209 warn(_("Warnings from 'gpg --import':\n%s") % error_messages)
211 def _check_xml_stream(stream):
212 xml_comment_start = b'<!-- Base64 Signature'
214 data_to_check = stream.read()
216 last_comment = data_to_check.rfind(b'\n' + xml_comment_start)
217 if last_comment < 0:
218 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
219 last_comment += 1 # Include new-line in data
221 # Copy the file to 'data', without the signature
222 # Copy the signature to 'sig'
224 with tempfile.TemporaryFile(mode = 'w+b') as data:
225 data.write(data_to_check[:last_comment])
226 data.flush()
227 os.lseek(data.fileno(), 0, 0)
229 with tempfile.TemporaryFile('w+t') as errors:
230 sig_lines = data_to_check[last_comment:].split(b'\n')
231 if sig_lines[0].strip() != xml_comment_start:
232 raise SafeException(_('Bad signature block: extra data on comment line'))
233 while sig_lines and not sig_lines[-1].strip():
234 del sig_lines[-1]
235 if sig_lines[-1].strip() != b'-->':
236 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
237 sig_data = b'\n'.join(sig_lines[1:-1])
239 if re.match(b'^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
240 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
241 try:
242 if hasattr(base64, 'decodebytes'):
243 sig_data = base64.decodebytes(sig_data) # Python 3
244 else:
245 sig_data = base64.decodestring(sig_data) # Python 2
246 except Exception as ex:
247 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))
249 with tempfile.NamedTemporaryFile(prefix = 'injector-sig-', mode = 'wb', delete = False) as sig_file:
250 sig_file.write(sig_data)
252 try:
253 # Note: Should ideally close status_r in the child, but we want to support Windows too
254 child = _run_gpg([# Not all versions support this:
255 #'--max-output', str(1024 * 1024),
256 '--batch',
257 # Windows GPG can only cope with "1" here
258 '--status-fd', '1',
259 # Don't try to download missing keys; we'll do that
260 '--keyserver-options', 'no-auto-key-retrieve',
261 '--verify', sig_file.name, '-'],
262 stdin = data,
263 stdout = subprocess.PIPE,
264 stderr = errors)
266 try:
267 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
268 finally:
269 os.lseek(stream.fileno(), 0, 0)
270 errors.close()
271 child.stdout.close()
272 child.wait()
273 stream.seek(0)
274 finally:
275 os.unlink(sig_file.name)
276 return (stream, sigs)
278 def check_stream(stream):
279 """Pass stream through gpg --decrypt to get the data, the error text,
280 and a list of signatures (good or bad). If stream starts with "<?xml "
281 then get the signature from a comment at the end instead (and the returned
282 data is the original stream). stream must be seekable.
283 @note: Stream returned may or may not be the one passed in. Be careful!
284 @return: (data_stream, [Signatures])"""
286 stream.seek(0)
288 start = stream.read(6)
289 stream.seek(0)
290 if start == b"<?xml ":
291 return _check_xml_stream(stream)
292 elif start == b'-----B':
293 raise SafeException(_("Plain GPG-signed feeds no longer supported"))
294 else:
295 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
297 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
298 """Read messages from status_r and collect signatures from it.
299 When done, reap 'child'.
300 If there are no signatures, throw SafeException (using errors
301 for the error message if non-empty)."""
302 sigs = []
304 # Should we error out on bad signatures, even if there's a good
305 # signature too?
307 for line in status_r:
308 assert line.endswith('\n')
309 if not line.startswith('[GNUPG:] '):
310 # The docs says every line starts with this, but if auto-key-retrieve
311 # is on then they might not. See bug #3420548
312 warn("Invalid output from GnuPG: %r", line)
313 continue
315 line = line[9:-1]
316 split_line = line.split(' ')
317 code = split_line[0]
318 args = split_line[1:]
319 if code == 'VALIDSIG':
320 sigs.append(ValidSig(args))
321 elif code == 'BADSIG':
322 sigs.append(BadSig(args))
323 elif code == 'ERRSIG':
324 sigs.append(ErrSig(args))
326 errors.seek(0)
328 error_messages = errors.read().strip()
330 if not sigs:
331 if error_messages:
332 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
333 else:
334 raise SafeException(_("No signatures found. No error messages from GPG."))
335 elif error_messages:
336 # Attach the warnings to all the signatures, in case they're useful.
337 for s in sigs:
338 s.messages = error_messages
340 return sigs