Log using the "0install" logger rather than "root"
[zeroinstall/solver.git] / zeroinstall / injector / gpg.py
blobbb619c38c3aa3456415a8bbb89ad899b0a5f851f
1 """
2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
7 """
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 from zeroinstall import _, logger
13 import subprocess
14 import base64, re
15 import os
16 import tempfile
18 from zeroinstall.support import find_in_path, basedir
19 from zeroinstall.injector.trust import trust_db
20 from zeroinstall.injector.model import SafeException
22 _gnupg_options = None
23 def _run_gpg(args, **kwargs):
24 global _gnupg_options
25 if _gnupg_options is None:
26 gpg_path = os.environ.get('ZEROINSTALL_GPG') or find_in_path('gpg') or find_in_path('gpg2') or 'gpg'
27 _gnupg_options = [gpg_path, '--no-secmem-warning']
29 if hasattr(os, 'geteuid') and os.geteuid() == 0 and 'GNUPGHOME' not in os.environ:
30 _gnupg_options += ['--homedir', os.path.join(basedir.home, '.gnupg')]
31 logger.info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options[-1])
33 return subprocess.Popen(_gnupg_options + args, universal_newlines = True, **kwargs)
35 class Signature(object):
36 """Abstract base class for signature check results.
37 @ivar status: the raw data returned by GPG
38 @ivar messages: any messages printed by GPG which may be relevant to this signature
39 """
40 status = None
41 messages = None
43 def __init__(self, status):
44 self.status = status
46 def is_trusted(self, domain = None):
47 """Whether this signature is trusted by the user."""
48 return False
50 def need_key(self):
51 """Returns the ID of the key that must be downloaded to check this signature."""
52 return None
54 class ValidSig(Signature):
55 """A valid signature check result."""
56 FINGERPRINT = 0
57 TIMESTAMP = 2
59 def __str__(self):
60 return "Valid signature from " + self.status[self.FINGERPRINT]
62 def is_trusted(self, domain = None):
63 """Asks the L{trust.trust_db}."""
64 return trust_db.is_trusted(self.status[self.FINGERPRINT], domain)
66 def get_timestamp(self):
67 """Get the time this signature was made."""
68 return int(self.status[self.TIMESTAMP])
70 fingerprint = property(lambda self: self.status[self.FINGERPRINT])
72 def get_details(self):
73 """Call 'gpg --list-keys' and return the results split into lines and columns.
74 @rtype: [[str]]"""
75 # Note: GnuPG 2 always uses --fixed-list-mode
76 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self.fingerprint], stdout = subprocess.PIPE)
77 cout, unused = child.communicate()
78 if child.returncode:
79 logger.info(_("GPG exited with code %d") % child.returncode)
80 details = []
81 for line in cout.split('\n'):
82 details.append(line.split(':'))
83 return details
85 class BadSig(Signature):
86 """A bad signature (doesn't match the message)."""
87 KEYID = 0
89 def __str__(self):
90 return _("BAD signature by %s (the message has been tampered with)") \
91 % self.status[self.KEYID]
93 class ErrSig(Signature):
94 """Error while checking a signature."""
95 KEYID = 0
96 ALG = 1
97 RC = -1
99 def __str__(self):
100 msg = _("ERROR signature by %s: ") % self.status[self.KEYID]
101 rc = int(self.status[self.RC])
102 if rc == 4:
103 msg += _("Unknown or unsupported algorithm '%s'") % self.status[self.ALG]
104 elif rc == 9:
105 msg += _("Unknown key. Try 'gpg --recv-key %s'") % self.status[self.KEYID]
106 else:
107 msg += _("Unknown reason code %d") % rc
108 return msg
110 def need_key(self):
111 rc = int(self.status[self.RC])
112 if rc == 9:
113 return self.status[self.KEYID]
114 return None
116 class Key:
117 """A GPG key.
118 @since: 0.27
119 @param fingerprint: the fingerprint of the key
120 @type fingerprint: str
121 @ivar name: a short name for the key, extracted from the full name
122 @type name: str
124 def __init__(self, fingerprint):
125 self.fingerprint = fingerprint
126 self.name = '(unknown)'
128 def get_short_name(self):
129 return self.name.split(' (', 1)[0].split(' <', 1)[0]
131 def load_keys(fingerprints):
132 """Load a set of keys at once.
133 This is much more efficient than making individual calls to L{load_key}.
134 @return: a list of loaded keys, indexed by fingerprint
135 @rtype: {str: L{Key}}
136 @since: 0.27"""
137 import codecs
139 keys = {}
141 # Otherwise GnuPG returns everything...
142 if not fingerprints: return keys
144 for fp in fingerprints:
145 keys[fp] = Key(fp)
147 current_fpr = None
148 current_uid = None
150 child = _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
151 '--with-fingerprint', '--with-fingerprint'] + fingerprints, stdout = subprocess.PIPE)
152 try:
153 for line in child.stdout:
154 if line.startswith('pub:'):
155 current_fpr = None
156 current_uid = None
157 if line.startswith('fpr:'):
158 current_fpr = line.split(':')[9]
159 if current_fpr in keys and current_uid:
160 # This is probably a subordinate key, where the fingerprint
161 # comes after the uid, not before. Note: we assume the subkey is
162 # cross-certified, as recent always ones are.
163 try:
164 keys[current_fpr].name = codecs.decode(current_uid, 'utf-8')
165 except:
166 logger.warn("Not UTF-8: %s", current_uid)
167 keys[current_fpr].name = current_uid
168 if line.startswith('uid:'):
169 assert current_fpr is not None
170 # Only take primary UID
171 if current_uid: continue
172 parts = line.split(':')
173 current_uid = parts[9]
174 if current_fpr in keys:
175 keys[current_fpr].name = current_uid
176 finally:
177 child.stdout.close()
179 if child.wait():
180 logger.warn(_("gpg --list-keys failed with exit code %d") % child.returncode)
182 return keys
184 def load_key(fingerprint):
185 """Query gpg for information about this key.
186 @return: a new key
187 @rtype: L{Key}
188 @since: 0.27"""
189 return load_keys([fingerprint])[fingerprint]
191 def import_key(stream):
192 """Run C{gpg --import} with this stream as stdin."""
193 with tempfile.TemporaryFile(mode = 'w+t') as errors:
194 child = _run_gpg(['--quiet', '--import', '--batch'],
195 stdin = stream, stderr = errors)
197 status = child.wait()
199 errors.seek(0)
200 error_messages = errors.read().strip()
202 if status != 0:
203 if error_messages:
204 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages)
205 else:
206 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status)
207 elif error_messages:
208 logger.warn(_("Warnings from 'gpg --import':\n%s") % error_messages)
210 def _check_xml_stream(stream):
211 xml_comment_start = b'<!-- Base64 Signature'
213 data_to_check = stream.read()
215 last_comment = data_to_check.rfind(b'\n' + xml_comment_start)
216 if last_comment < 0:
217 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
218 last_comment += 1 # Include new-line in data
220 # Copy the file to 'data', without the signature
221 # Copy the signature to 'sig'
223 with tempfile.TemporaryFile(mode = 'w+b') as data:
224 data.write(data_to_check[:last_comment])
225 data.flush()
226 os.lseek(data.fileno(), 0, 0)
228 with tempfile.TemporaryFile('w+t') as errors:
229 sig_lines = data_to_check[last_comment:].split(b'\n')
230 if sig_lines[0].strip() != xml_comment_start:
231 raise SafeException(_('Bad signature block: extra data on comment line'))
232 while sig_lines and not sig_lines[-1].strip():
233 del sig_lines[-1]
234 if sig_lines[-1].strip() != b'-->':
235 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
236 sig_data = b'\n'.join(sig_lines[1:-1])
238 if re.match(b'^[ A-Za-z0-9+/=\n]+$', sig_data) is None:
239 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
240 try:
241 if hasattr(base64, 'decodebytes'):
242 sig_data = base64.decodebytes(sig_data) # Python 3
243 else:
244 sig_data = base64.decodestring(sig_data) # Python 2
245 except Exception as ex:
246 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex))
248 with tempfile.NamedTemporaryFile(prefix = 'injector-sig-', mode = 'wb', delete = False) as sig_file:
249 sig_file.write(sig_data)
251 try:
252 # Note: Should ideally close status_r in the child, but we want to support Windows too
253 child = _run_gpg([# Not all versions support this:
254 #'--max-output', str(1024 * 1024),
255 '--batch',
256 # Windows GPG can only cope with "1" here
257 '--status-fd', '1',
258 # Don't try to download missing keys; we'll do that
259 '--keyserver-options', 'no-auto-key-retrieve',
260 '--verify', sig_file.name, '-'],
261 stdin = data,
262 stdout = subprocess.PIPE,
263 stderr = errors)
265 try:
266 sigs = _get_sigs_from_gpg_status_stream(child.stdout, child, errors)
267 finally:
268 os.lseek(stream.fileno(), 0, 0)
269 errors.close()
270 child.stdout.close()
271 child.wait()
272 stream.seek(0)
273 finally:
274 os.unlink(sig_file.name)
275 return (stream, sigs)
277 def check_stream(stream):
278 """Pass stream through gpg --decrypt to get the data, the error text,
279 and a list of signatures (good or bad). If stream starts with "<?xml "
280 then get the signature from a comment at the end instead (and the returned
281 data is the original stream). stream must be seekable.
282 @note: Stream returned may or may not be the one passed in. Be careful!
283 @return: (data_stream, [Signatures])"""
285 stream.seek(0)
287 start = stream.read(6)
288 stream.seek(0)
289 if start == b"<?xml ":
290 return _check_xml_stream(stream)
291 elif start == b'-----B':
292 raise SafeException(_("Plain GPG-signed feeds no longer supported"))
293 else:
294 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream.read(120)))
296 def _get_sigs_from_gpg_status_stream(status_r, child, errors):
297 """Read messages from status_r and collect signatures from it.
298 When done, reap 'child'.
299 If there are no signatures, throw SafeException (using errors
300 for the error message if non-empty)."""
301 sigs = []
303 # Should we error out on bad signatures, even if there's a good
304 # signature too?
306 for line in status_r:
307 assert line.endswith('\n')
308 if not line.startswith('[GNUPG:] '):
309 # The docs says every line starts with this, but if auto-key-retrieve
310 # is on then they might not. See bug #3420548
311 logger.warn("Invalid output from GnuPG: %r", line)
312 continue
314 line = line[9:-1]
315 split_line = line.split(' ')
316 code = split_line[0]
317 args = split_line[1:]
318 if code == 'VALIDSIG':
319 sigs.append(ValidSig(args))
320 elif code == 'BADSIG':
321 sigs.append(BadSig(args))
322 elif code == 'ERRSIG':
323 sigs.append(ErrSig(args))
325 errors.seek(0)
327 error_messages = errors.read().strip()
329 if not sigs:
330 if error_messages:
331 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages)
332 else:
333 raise SafeException(_("No signatures found. No error messages from GPG."))
334 elif error_messages:
335 # Attach the warnings to all the signatures, in case they're useful.
336 for s in sigs:
337 s.messages = error_messages
339 return sigs