2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 from zeroinstall
import _
17 from logging
import info
, warn
19 from zeroinstall
.support
import find_in_path
, basedir
20 from zeroinstall
.injector
.trust
import trust_db
21 from zeroinstall
.injector
.model
import SafeException
23 _gnupg_options
= ['gpg', '--no-secmem-warning']
24 if hasattr(os
, 'geteuid') and os
.geteuid() == 0 and 'GNUPGHOME' not in os
.environ
:
25 _gnupg_options
+= ['--homedir', os
.path
.join(basedir
.home
, '.gnupg')]
26 info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options
[-1])
28 class Signature(object):
29 """Abstract base class for signature check results."""
32 def __init__(self
, status
):
35 def is_trusted(self
, domain
= None):
36 """Whether this signature is trusted by the user."""
40 """Returns the ID of the key that must be downloaded to check this signature."""
43 class ValidSig(Signature
):
44 """A valid signature check result."""
49 return "Valid signature from " + self
.status
[self
.FINGERPRINT
]
51 def is_trusted(self
, domain
= None):
52 """Asks the L{trust.trust_db}."""
53 return trust_db
.is_trusted(self
.status
[self
.FINGERPRINT
], domain
)
55 def get_timestamp(self
):
56 """Get the time this signature was made."""
57 return int(self
.status
[self
.TIMESTAMP
])
59 fingerprint
= property(lambda self
: self
.status
[self
.FINGERPRINT
])
61 def get_details(self
):
62 """Call 'gpg --list-keys' and return the results split into lines and columns.
64 child
= subprocess
.Popen(_gnupg_options
+ ['--with-colons', '--list-keys', self
.fingerprint
], stdout
= subprocess
.PIPE
)
65 cout
, unused
= child
.communicate()
67 info(_("GPG exited with code %d") % child
.returncode
)
69 for line
in cout
.split('\n'):
70 details
.append(line
.split(':'))
73 class BadSig(Signature
):
74 """A bad signature (doesn't match the message)."""
78 return _("BAD signature by %s (the message has been tampered with)") \
79 % self
.status
[self
.KEYID
]
81 class ErrSig(Signature
):
82 """Error while checking a signature."""
88 msg
= _("ERROR signature by %s: ") % self
.status
[self
.KEYID
]
89 rc
= int(self
.status
[self
.RC
])
91 msg
+= _("Unknown or unsupported algorithm '%s'") % self
.status
[self
.ALG
]
93 msg
+= _("Unknown key. Try 'gpg --recv-key %s'") % self
.status
[self
.KEYID
]
95 msg
+= _("Unknown reason code %d") % rc
99 rc
= int(self
.status
[self
.RC
])
101 return self
.status
[self
.KEYID
]
107 @param fingerprint: the fingerprint of the key
108 @type fingerprint: str
109 @ivar name: a short name for the key, extracted from the full name
112 def __init__(self
, fingerprint
):
113 self
.fingerprint
= fingerprint
114 self
.name
= '(unknown)'
116 def get_short_name(self
):
117 return self
.name
.split(' (', 1)[0].split(' <', 1)[0]
119 def load_keys(fingerprints
):
120 """Load a set of keys at once.
121 This is much more efficient than making individual calls to L{load_key}.
122 @return: a list of loaded keys, indexed by fingerprint
123 @rtype: {str: L{Key}}
128 # Otherwise GnuPG returns everything...
129 if not fingerprints
: return keys
131 for fp
in fingerprints
:
137 child
= subprocess
.Popen(_gnupg_options
+ ['--fixed-list-mode', '--with-colons', '--list-keys',
138 '--with-fingerprint', '--with-fingerprint'] + fingerprints
, stdout
= subprocess
.PIPE
)
140 for line
in child
.stdout
:
141 if line
.startswith('pub:'):
144 if line
.startswith('fpr:'):
145 current_fpr
= line
.split(':')[9]
146 if current_fpr
in keys
and current_uid
:
147 # This is probably a subordinate key, where the fingerprint
148 # comes after the uid, not before. Note: we assume the subkey is
149 # cross-certified, as recent always ones are.
150 keys
[current_fpr
].name
= current_uid
151 if line
.startswith('uid:'):
152 assert current_fpr
is not None
153 # Only take primary UID
154 if current_uid
: continue
155 parts
= line
.split(':')
156 current_uid
= parts
[9]
157 if current_fpr
in keys
:
158 keys
[current_fpr
].name
= current_uid
161 warn(_("gpg --list-keys failed with exit code %d") % child
.returncode
)
165 def load_key(fingerprint
):
166 """Query gpg for information about this key.
170 return load_keys([fingerprint
])[fingerprint
]
172 def import_key(stream
):
173 """Run C{gpg --import} with this stream as stdin."""
174 errors
= tempfile
.TemporaryFile()
176 child
= subprocess
.Popen(_gnupg_options
+ ['--quiet', '--import'],
177 stdin
= stream
, stderr
= errors
)
179 status
= child
.wait()
182 error_messages
= errors
.read().strip()
186 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages
)
188 def _check_plain_stream(stream
):
189 data
= tempfile
.TemporaryFile() # Python2.2 does not support 'prefix'
190 errors
= tempfile
.TemporaryFile()
192 status_r
, status_w
= os
.pipe()
194 # Note: Should ideally close status_r in the child, but we want to support Windows too
195 child
= subprocess
.Popen(_gnupg_options
+ ['--decrypt',
196 # Not all versions support this:
197 #'--max-output', str(1024 * 1024),
199 '--status-fd', str(status_w
)],
207 sigs
= _get_sigs_from_gpg_status_stream(os
.fdopen(status_r
), child
, errors
)
212 def _check_xml_stream(stream
):
213 xml_comment_start
= '<!-- Base64 Signature'
215 data_to_check
= stream
.read()
217 last_comment
= data_to_check
.rfind('\n' + xml_comment_start
)
219 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
220 last_comment
+= 1 # Include new-line in data
222 data
= tempfile
.TemporaryFile()
223 data
.write(data_to_check
[:last_comment
])
225 os
.lseek(data
.fileno(), 0, 0)
227 errors
= tempfile
.TemporaryFile()
229 sig_lines
= data_to_check
[last_comment
:].split('\n')
230 if sig_lines
[0].strip() != xml_comment_start
:
231 raise SafeException(_('Bad signature block: extra data on comment line'))
232 while sig_lines
and not sig_lines
[-1].strip():
234 if sig_lines
[-1].strip() != '-->':
235 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
236 sig_data
= '\n'.join(sig_lines
[1:-1])
238 if re
.match('^[ A-Za-z0-9+/=\n]+$', sig_data
) is None:
239 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
241 sig_data
= base64
.decodestring(sig_data
) # (b64decode is Python 2.4)
242 except Exception, ex
:
243 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex
))
245 sig_fd
, sig_name
= tempfile
.mkstemp(prefix
= 'injector-sig-')
247 sig_file
= os
.fdopen(sig_fd
, 'w')
248 sig_file
.write(sig_data
)
251 # Note: Should ideally close status_r in the child, but we want to support Windows too
252 child
= subprocess
.Popen(_gnupg_options
+ [
253 # Not all versions support this:
254 #'--max-output', str(1024 * 1024),
256 # Windows GPG can only cope with "1" here
258 '--verify', sig_name
, '-'],
260 stdout
= subprocess
.PIPE
,
264 sigs
= _get_sigs_from_gpg_status_stream(child
.stdout
, child
, errors
)
266 os
.lseek(stream
.fileno(), 0, 0)
270 return (stream
, sigs
)
272 def check_stream(stream
):
273 """Pass stream through gpg --decrypt to get the data, the error text,
274 and a list of signatures (good or bad). If stream starts with "<?xml "
275 then get the signature from a comment at the end instead (and the returned
276 data is the original stream). stream must be seekable.
277 @note: Stream returned may or may not be the one passed in. Be careful!
278 @return: (data_stream, [Signatures])"""
279 if not find_in_path('gpg'):
280 raise SafeException(_("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org"))
286 start
= stream
.read(6)
288 if start
== "<?xml ":
289 return _check_xml_stream(stream
)
290 elif start
== '-----B':
292 warnings
.warn(_("Plain GPG-signed feeds are deprecated!"), DeprecationWarning, stacklevel
= 2)
293 os
.lseek(stream
.fileno(), 0, 0)
294 return _check_plain_stream(stream
)
296 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream
.read(120)))
298 def _get_sigs_from_gpg_status_stream(status_r
, child
, errors
):
299 """Read messages from status_r and collect signatures from it.
300 When done, reap 'child'.
301 If there are no signatures, throw SafeException (using errors
302 for the error message if non-empty)."""
305 # Should we error out on bad signatures, even if there's a good
308 for line
in status_r
:
309 assert line
.endswith('\n')
310 assert line
.startswith('[GNUPG:] ')
312 split_line
= line
.split(' ')
314 args
= split_line
[1:]
315 if code
== 'VALIDSIG':
316 sigs
.append(ValidSig(args
))
317 elif code
== 'BADSIG':
318 sigs
.append(BadSig(args
))
319 elif code
== 'ERRSIG':
320 sigs
.append(ErrSig(args
))
322 status
= child
.wait()
326 error_messages
= errors
.read().strip()
331 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages
)
333 raise SafeException(_("No signatures found. No error messages from GPG."))