2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
16 from logging
import info
18 from zeroinstall
.support
import find_in_path
, basedir
19 from zeroinstall
.injector
.trust
import trust_db
20 from zeroinstall
.injector
.model
import SafeException
22 _gnupg_options
= ['gpg', '--no-secmem-warning']
23 if os
.geteuid() == 0 and 'GNUPGHOME' not in os
.environ
:
24 _gnupg_options
+= ['--homedir', os
.path
.join(basedir
.home
, '.gnupg')]
25 info("Running as root, so setting GnuPG home to %s", _gnupg_options
[-1])
27 class Signature(object):
28 """Abstract base class for signature check results."""
31 def __init__(self
, status
):
34 def is_trusted(self
, domain
= None):
35 """Whether this signature is trusted by the user."""
39 """Returns the ID of the key that must be downloaded to check this signature."""
42 class ValidSig(Signature
):
43 """A valid signature check result."""
48 return "Valid signature from " + self
.status
[self
.FINGERPRINT
]
50 def is_trusted(self
, domain
= None):
51 """Asks the L{trust.trust_db}."""
52 return trust_db
.is_trusted(self
.status
[self
.FINGERPRINT
], domain
)
54 def get_timestamp(self
):
55 """Get the time this signature was made."""
56 return int(self
.status
[self
.TIMESTAMP
])
58 fingerprint
= property(lambda self
: self
.status
[self
.FINGERPRINT
])
60 def get_details(self
):
61 """Call 'gpg --list-keys' and return the results split into lines and columns.
63 child
= subprocess
.Popen(_gnupg_options
+ ['--with-colons', '--list-keys', self
.fingerprint
], stdout
= subprocess
.PIPE
)
64 cout
, unused
= child
.communicate()
66 info("GPG exited with code %d" % child
.returncode
)
68 for line
in cout
.split('\n'):
69 details
.append(line
.split(':'))
72 class BadSig(Signature
):
73 """A bad signature (doesn't match the message)."""
77 return "BAD signature by " + self
.status
[self
.KEYID
] + \
78 " (the message has been tampered with)"
80 class ErrSig(Signature
):
81 """Error while checking a signature."""
87 msg
= "ERROR signature by %s: " % self
.status
[self
.KEYID
]
88 rc
= int(self
.status
[self
.RC
])
90 msg
+= "Unknown or unsupported algorithm '%s'" % self
.status
[self
.ALG
]
92 msg
+= "Unknown key. Try 'gpg --recv-key %s'" % self
.status
[self
.KEYID
]
94 msg
+= "Unknown reason code %d" % rc
98 rc
= int(self
.status
[self
.RC
])
100 return self
.status
[self
.KEYID
]
106 @param fingerprint: the fingerprint of the key
107 @type fingerprint: str
108 @ivar name: a short name for the key, extracted from the full name
111 def __init__(self
, fingerprint
):
112 self
.fingerprint
= fingerprint
113 self
.name
= '(unknown)'
115 def get_short_name(self
):
116 return self
.name
.split(' (', 1)[0].split(' <', 1)[0]
118 def load_keys(fingerprints
):
119 """Load a set of keys at once.
120 This is much more efficient than making individual calls to L{load_key}.
121 @return: a list of loaded keys, indexed by fingerprint
122 @rtype: {str: L{Key}}
127 # Otherwise GnuPG returns everything...
128 if not fingerprints
: return keys
130 for fp
in fingerprints
:
136 cin
, cout
= os
.popen2(_gnupg_options
+ ['--fixed-list-mode', '--with-colons', '--list-keys',
137 '--with-fingerprint', '--with-fingerprint'] + fingerprints
)
141 if line
.startswith('pub:'):
144 if line
.startswith('fpr:'):
145 current_fpr
= line
.split(':')[9]
146 if current_fpr
in keys
and current_uid
:
147 # This is probably a subordinate key, where the fingerprint
148 # comes after the uid, not before. Note: we assume the subkey is
149 # cross-certified, as recent always ones are.
150 keys
[current_fpr
].name
= current_uid
151 if line
.startswith('uid:'):
152 assert current_fpr
is not None
153 # Only take primary UID
154 if current_uid
: continue
155 parts
= line
.split(':')
156 current_uid
= parts
[9]
157 if current_fpr
in keys
:
158 keys
[current_fpr
].name
= current_uid
164 def load_key(fingerprint
):
165 """Query gpg for information about this key.
169 return load_keys([fingerprint
])[fingerprint
]
171 def import_key(stream
):
172 """Run C{gpg --import} with this stream as stdin."""
173 errors
= tempfile
.TemporaryFile()
175 child
= subprocess
.Popen(_gnupg_options
+ ['--quiet', '--import'],
176 stdin
= stream
, stderr
= errors
)
178 status
= child
.wait()
181 error_messages
= errors
.read().strip()
185 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages
)
187 def _check_plain_stream(stream
):
188 data
= tempfile
.TemporaryFile() # Python2.2 does not support 'prefix'
189 errors
= tempfile
.TemporaryFile()
191 status_r
, status_w
= os
.pipe()
193 # Note: Should ideally close status_r in the child, but we want to support Windows too
194 child
= subprocess
.Popen(_gnupg_options
+ ['--decrypt',
195 # Not all versions support this:
196 #'--max-output', str(1024 * 1024),
198 '--status-fd', str(status_w
)],
206 sigs
= _get_sigs_from_gpg_status_stream(status_r
, child
, errors
)
211 def _check_xml_stream(stream
):
212 xml_comment_start
= '<!-- Base64 Signature'
214 data_to_check
= stream
.read()
216 last_comment
= data_to_check
.rfind('\n' + xml_comment_start
)
218 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
219 last_comment
+= 1 # Include new-line in data
221 data
= tempfile
.TemporaryFile()
222 data
.write(data_to_check
[:last_comment
])
224 os
.lseek(data
.fileno(), 0, 0)
226 errors
= tempfile
.TemporaryFile()
228 sig_lines
= data_to_check
[last_comment
:].split('\n')
229 if sig_lines
[0].strip() != xml_comment_start
:
230 raise SafeException('Bad signature block: extra data on comment line')
231 while sig_lines
and not sig_lines
[-1].strip():
233 if sig_lines
[-1].strip() != '-->':
234 raise SafeException('Bad signature block: last line is not end-of-comment')
235 sig_data
= '\n'.join(sig_lines
[1:-1])
237 if re
.match('^[ A-Za-z0-9+/=\n]+$', sig_data
) is None:
238 raise SafeException("Invalid characters found in base 64 encoded signature")
240 sig_data
= base64
.decodestring(sig_data
) # (b64decode is Python 2.4)
241 except Exception, ex
:
242 raise SafeException("Invalid base 64 encoded signature: " + str(ex
))
244 sig_fd
, sig_name
= tempfile
.mkstemp(prefix
= 'injector-sig-')
246 sig_file
= os
.fdopen(sig_fd
, 'w')
247 sig_file
.write(sig_data
)
250 status_r
, status_w
= os
.pipe()
252 # Note: Should ideally close status_r in the child, but we want to support Windows too
253 child
= subprocess
.Popen(_gnupg_options
+ [
254 # Not all versions support this:
255 #'--max-output', str(1024 * 1024),
257 '--status-fd', str(status_w
),
258 '--verify', sig_name
, '-'],
265 sigs
= _get_sigs_from_gpg_status_stream(status_r
, child
, errors
)
267 os
.lseek(stream
.fileno(), 0, 0)
271 return (stream
, sigs
)
273 def check_stream(stream
):
274 """Pass stream through gpg --decrypt to get the data, the error text,
275 and a list of signatures (good or bad). If stream starts with "<?xml "
276 then get the signature from a comment at the end instead (and the returned
277 data is the original stream). stream must be seekable.
278 @note: Stream returned may or may not be the one passed in. Be careful!
279 @return: (data_stream, [Signatures])"""
280 if not find_in_path('gpg'):
281 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
287 start
= stream
.read(6)
289 if start
== "<?xml ":
290 return _check_xml_stream(stream
)
291 elif start
== '-----B':
293 warnings
.warn("Plain GPG-signed feeds are deprecated!", DeprecationWarning, stacklevel
= 2)
294 os
.lseek(stream
.fileno(), 0, 0)
295 return _check_plain_stream(stream
)
297 raise SafeException("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s" % repr(stream
.read(120)))
299 def _get_sigs_from_gpg_status_stream(status_r
, child
, errors
):
300 """Read messages from status_r and collect signatures from it.
301 When done, reap 'child'.
302 If there are no signatures, throw SafeException (using errors
303 for the error message if non-empty)."""
306 # Should we error out on bad signatures, even if there's a good
309 for line
in os
.fdopen(status_r
):
310 assert line
.endswith('\n')
311 assert line
.startswith('[GNUPG:] ')
313 split_line
= line
.split(' ')
315 args
= split_line
[1:]
316 if code
== 'VALIDSIG':
317 sigs
.append(ValidSig(args
))
318 elif code
== 'BADSIG':
319 sigs
.append(BadSig(args
))
320 elif code
== 'ERRSIG':
321 sigs
.append(ErrSig(args
))
323 status
= child
.wait()
327 error_messages
= errors
.read().strip()
332 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages
)
334 raise SafeException("No signatures found. No error messages from GPG.")