2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
17 from logging
import info
19 from zeroinstall
import support
20 from zeroinstall
.injector
.trust
import trust_db
21 from zeroinstall
.injector
.model
import SafeException
23 class Signature(object):
24 """Abstract base class for signature check results."""
27 def __init__(self
, status
):
30 def is_trusted(self
, domain
= None):
31 """Whether this signature is trusted by the user."""
35 """Returns the ID of the key that must be downloaded to check this signature."""
38 class ValidSig(Signature
):
39 """A valid signature check result."""
44 return "Valid signature from " + self
.status
[self
.FINGERPRINT
]
46 def is_trusted(self
, domain
= None):
47 """Asks the L{trust.trust_db}."""
48 return trust_db
.is_trusted(self
.status
[self
.FINGERPRINT
], domain
)
50 def get_timestamp(self
):
51 """Get the time this signature was made."""
52 return int(self
.status
[self
.TIMESTAMP
])
54 fingerprint
= property(lambda self
: self
.status
[self
.FINGERPRINT
])
56 def get_details(self
):
57 """Call 'gpg --list-keys' and return the results split into lines and columns.
59 child
= subprocess
.Popen(['gpg', '--with-colons', '--no-secmem-warning', '--list-keys', self
.fingerprint
], stdout
= subprocess
.PIPE
)
60 cout
, unused
= child
.communicate()
62 info("GPG exited with code %d" % child
.returncode
)
64 for line
in cout
.split('\n'):
65 details
.append(line
.split(':'))
68 class BadSig(Signature
):
69 """A bad signature (doesn't match the message)."""
73 return "BAD signature by " + self
.status
[self
.KEYID
] + \
74 " (the message has been tampered with)"
76 class ErrSig(Signature
):
77 """Error while checking a signature."""
83 msg
= "ERROR signature by %s: " % self
.status
[self
.KEYID
]
84 rc
= int(self
.status
[self
.RC
])
86 msg
+= "Unknown or unsupported algorithm '%s'" % self
.status
[self
.ALG
]
88 msg
+= "Unknown key. Try 'gpg --recv-key %s'" % self
.status
[self
.KEYID
]
90 msg
+= "Unknown reason code %d" % rc
94 rc
= int(self
.status
[self
.RC
])
96 return self
.status
[self
.KEYID
]
102 @param fingerprint: the fingerprint of the key
103 @type fingerprint: str
104 @ivar name: a short name for the key, extracted from the full name
107 def __init__(self
, fingerprint
):
108 self
.fingerprint
= fingerprint
109 self
.name
= '(unknown)'
111 def get_short_name(self
):
112 return self
.name
.split(' (', 1)[0].split(' <', 1)[0]
114 def load_keys(fingerprints
):
115 """Load a set of keys at once.
116 This is much more efficient than making individual calls to L{load_key}.
117 @return: a list of loaded keys, indexed by fingerprint
118 @rtype: {str: L{Key}}
123 # Otherwise GnuPG returns everything...
124 if not fingerprints
: return keys
126 for fp
in fingerprints
:
132 cin
, cout
= os
.popen2(['gpg', '--fixed-list-mode', '--with-colons', '--list-keys',
133 '--with-fingerprint', '--with-fingerprint'] + fingerprints
)
137 if line
.startswith('pub:'):
140 if line
.startswith('fpr:'):
141 current_fpr
= line
.split(':')[9]
142 if current_fpr
in keys
and current_uid
:
143 # This is probably a subordinate key, where the fingerprint
144 # comes after the uid, not before. Note: we assume the subkey is
145 # cross-certified, as recent always ones are.
146 keys
[current_fpr
].name
= current_uid
147 if line
.startswith('uid:'):
148 assert current_fpr
is not None
149 parts
= line
.split(':')
150 current_uid
= parts
[9]
151 if current_fpr
in keys
:
152 keys
[current_fpr
].name
= current_uid
158 def load_key(fingerprint
):
159 """Query gpg for information about this key.
163 return load_keys([fingerprint
])[fingerprint
]
165 def import_key(stream
):
166 """Run C{gpg --import} with this stream as stdin."""
167 errors
= tempfile
.TemporaryFile()
169 child
= subprocess
.Popen(['gpg', '--no-secmem-warning', '--quiet', '--import'],
170 stdin
= stream
, stderr
= errors
)
172 status
= child
.wait()
175 error_messages
= errors
.read().strip()
179 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages
)
181 def _check_plain_stream(stream
):
182 data
= tempfile
.TemporaryFile() # Python2.2 does not support 'prefix'
183 errors
= tempfile
.TemporaryFile()
185 status_r
, status_w
= os
.pipe()
187 # Note: Should ideally close status_r in the child, but we want to support Windows too
188 child
= subprocess
.Popen(['gpg', '--no-secmem-warning', '--decrypt',
189 # Not all versions support this:
190 #'--max-output', str(1024 * 1024),
192 '--status-fd', str(status_w
)],
200 sigs
= _get_sigs_from_gpg_status_stream(status_r
, child
, errors
)
205 def _check_xml_stream(stream
):
206 xml_comment_start
= '<!-- Base64 Signature'
208 data_to_check
= stream
.read()
210 last_comment
= data_to_check
.rfind('\n' + xml_comment_start
)
212 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
213 last_comment
+= 1 # Include new-line in data
215 data
= tempfile
.TemporaryFile()
216 data
.write(data_to_check
[:last_comment
])
218 os
.lseek(data
.fileno(), 0, 0)
220 errors
= tempfile
.TemporaryFile()
222 sig_lines
= data_to_check
[last_comment
:].split('\n')
223 if sig_lines
[0].strip() != xml_comment_start
:
224 raise SafeException('Bad signature block: extra data on comment line')
225 while sig_lines
and not sig_lines
[-1].strip():
227 if sig_lines
[-1].strip() != '-->':
228 raise SafeException('Bad signature block: last line is not end-of-comment')
229 sig_data
= '\n'.join(sig_lines
[1:-1])
231 if re
.match('^[ A-Za-z0-9+/=\n]+$', sig_data
) is None:
232 raise SafeException("Invalid characters found in base 64 encoded signature")
234 sig_data
= base64
.decodestring(sig_data
) # (b64decode is Python 2.4)
235 except Exception, ex
:
236 raise SafeException("Invalid base 64 encoded signature: " + str(ex
))
238 sig_fd
, sig_name
= tempfile
.mkstemp(prefix
= 'injector-sig-')
240 sig_file
= os
.fdopen(sig_fd
, 'w')
241 sig_file
.write(sig_data
)
244 status_r
, status_w
= os
.pipe()
246 # Note: Should ideally close status_r in the child, but we want to support Windows too
247 child
= subprocess
.Popen(['gpg', '--no-secmem-warning',
248 # Not all versions support this:
249 #'--max-output', str(1024 * 1024),
251 '--status-fd', str(status_w
),
252 '--verify', sig_name
, '-'],
259 sigs
= _get_sigs_from_gpg_status_stream(status_r
, child
, errors
)
261 os
.lseek(stream
.fileno(), 0, 0)
265 return (stream
, sigs
)
267 def check_stream(stream
):
268 """Pass stream through gpg --decrypt to get the data, the error text,
269 and a list of signatures (good or bad). If stream starts with "<?xml "
270 then get the signature from a comment at the end instead (and the returned
271 data is the original stream). stream must be seekable.
272 @note: Stream returned may or may not be the one passed in. Be careful!
273 @return: (data_stream, [Signatures])"""
274 if not support
.find_in_path('gpg'):
275 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
281 start
= stream
.read(6)
283 if start
== "<?xml ":
284 return _check_xml_stream(stream
)
285 elif start
== '-----B':
287 warnings
.warn("Plain GPG-signed feeds are deprecated!", DeprecationWarning, stacklevel
= 2)
288 os
.lseek(stream
.fileno(), 0, 0)
289 return _check_plain_stream(stream
)
291 raise SafeException("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s" % repr(stream
.read(120)))
293 def _get_sigs_from_gpg_status_stream(status_r
, child
, errors
):
294 """Read messages from status_r and collect signatures from it.
295 When done, reap 'child'.
296 If there are no signatures, throw SafeException (using errors
297 for the error message if non-empty)."""
300 # Should we error out on bad signatures, even if there's a good
303 for line
in os
.fdopen(status_r
):
304 assert line
.endswith('\n')
305 assert line
.startswith('[GNUPG:] ')
307 split_line
= line
.split(' ')
309 args
= split_line
[1:]
310 if code
== 'VALIDSIG':
311 sigs
.append(ValidSig(args
))
312 elif code
== 'BADSIG':
313 sigs
.append(BadSig(args
))
314 elif code
== 'ERRSIG':
315 sigs
.append(ErrSig(args
))
317 status
= child
.wait()
321 error_messages
= errors
.read().strip()
326 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages
)
328 raise SafeException("No signatures found. No error messages from GPG.")