2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
9 # Copyright (C) 2006, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
16 from logging
import warn
17 from trust
import trust_db
18 from model
import SafeException
20 class Signature(object):
21 """Abstract base class for signature check results."""
24 def __init__(self
, status
):
27 def is_trusted(self
, domain
= None):
31 """Returns the ID of the key that must be downloaded to check this signature."""
34 class ValidSig(Signature
):
35 """A valid signature check result."""
40 return "Valid signature from " + self
.status
[self
.FINGERPRINT
]
42 def is_trusted(self
, domain
= None):
43 return trust_db
.is_trusted(self
.status
[self
.FINGERPRINT
], domain
)
45 def get_timestamp(self
):
46 return int(self
.status
[self
.TIMESTAMP
])
48 fingerprint
= property(lambda self
: self
.status
[self
.FINGERPRINT
])
50 def get_details(self
):
51 cin
, cout
= os
.popen2(('gpg', '--with-colons', '--no-secmem-warning', '--list-keys', self
.fingerprint
))
55 details
.append(line
.split(':'))
59 class BadSig(Signature
):
60 """A bad signature (doesn't match the message)."""
64 return "BAD signature by " + self
.status
[self
.KEYID
] + \
65 " (the message has been tampered with)"
67 class ErrSig(Signature
):
68 """Error while checking a signature."""
74 msg
= "ERROR signature by %s: " % self
.status
[self
.KEYID
]
75 rc
= int(self
.status
[self
.RC
])
77 msg
+= "Unknown or unsupported algorithm '%s'" % self
.status
[self
.ALG
]
79 msg
+= "Unknown key. Try 'gpg --recv-key %s'" % self
.status
[self
.KEYID
]
81 msg
+= "Unknown reason code %d" % rc
85 rc
= int(self
.status
[self
.RC
])
87 return self
.status
[self
.KEYID
]
94 def __init__(self
, fingerprint
):
95 self
.fingerprint
= fingerprint
96 self
.name
= '(unknown)'
98 def get_short_name(self
):
99 return self
.name
.split(' (', 1)[0].split(' <', 1)[0]
101 def load_keys(fingerprints
):
102 """Load a set of keys at once.
103 This is much more efficient than making individual calls to L{load_key}.
104 @return: a list of loaded keys, indexed by fingerprint
105 @rtype: {str: L{Key}}
110 # Otherwise GnuPG returns everything...
111 if not fingerprints
: return keys
113 for fp
in fingerprints
:
118 cin
, cout
= os
.popen2(['gpg', '--fixed-list-mode', '--with-colons', '--list-keys', '--with-fingerprint'] + fingerprints
)
122 if line
.startswith('pub:'):
124 if line
.startswith('fpr:'):
125 assert current_fpr
is None
126 current_fpr
= line
.split(':')[9]
127 if line
.startswith('uid:'):
128 assert current_fpr
is not None
129 parts
= line
.split(':')
130 if current_fpr
in keys
:
131 keys
[current_fpr
].name
= parts
[9]
133 warn("Got information about key '%s', but I only asked about '%s'!", current_fpr
, fingerprints
)
139 def load_key(fingerprint
):
140 """Query gpg for information about this key.
144 return load_keys([fingerprint
])[fingerprint
]
146 def import_key(stream
):
147 """Run C{gpg --import} with this stream as stdin."""
148 errors
= tempfile
.TemporaryFile()
155 os
.dup2(stream
.fileno(), 0)
156 os
.dup2(errors
.fileno(), 2)
157 os
.execlp('gpg', 'gpg', '--no-secmem-warning', '--quiet', '--import')
159 traceback
.print_exc()
164 pid
, status
= os
.waitpid(child
, 0)
168 error_messages
= errors
.read().strip()
172 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages
)
174 def _check_plain_stream(stream
):
175 data
= tempfile
.TemporaryFile() # Python2.2 does not support 'prefix'
176 errors
= tempfile
.TemporaryFile()
178 status_r
, status_w
= os
.pipe()
187 os
.dup2(stream
.fileno(), 0)
188 os
.dup2(data
.fileno(), 1)
189 os
.dup2(errors
.fileno(), 2)
190 os
.execlp('gpg', 'gpg', '--no-secmem-warning', '--decrypt',
191 # Not all versions support this:
192 #'--max-output', str(1024 * 1024),
194 '--status-fd', str(status_w
))
196 traceback
.print_exc()
205 sigs
= _get_sigs_from_gpg_status_stream(status_r
, child
, errors
)
210 def _check_xml_stream(stream
):
211 xml_comment_start
= '<!-- Base64 Signature'
213 data_to_check
= stream
.read()
215 last_comment
= data_to_check
.rfind('\n' + xml_comment_start
)
217 raise SafeException("No signature block in XML. Maybe this file isn't signed?")
218 last_comment
+= 1 # Include new-line in data
220 data
= tempfile
.TemporaryFile()
221 data
.write(data_to_check
[:last_comment
])
223 os
.lseek(data
.fileno(), 0, 0)
225 errors
= tempfile
.TemporaryFile()
227 sig_lines
= data_to_check
[last_comment
:].split('\n')
228 if sig_lines
[0].strip() != xml_comment_start
:
229 raise SafeException('Bad signature block: extra data on comment line')
230 while sig_lines
and not sig_lines
[-1].strip():
232 if sig_lines
[-1].strip() != '-->':
233 raise SafeException('Bad signature block: last line is not end-of-comment')
234 sig_data
= '\n'.join(sig_lines
[1:-1])
236 if re
.match('^[ A-Za-z0-9+/=\n]+$', sig_data
) is None:
237 raise SafeException("Invalid characters found in base 64 encoded signature")
239 sig_data
= base64
.decodestring(sig_data
) # (b64decode is Python 2.4)
240 except Exception, ex
:
241 raise SafeException("Invalid base 64 encoded signature: " + str(ex
))
243 sig_fd
, sig_name
= tempfile
.mkstemp(prefix
= 'injector-sig-')
245 sig_file
= os
.fdopen(sig_fd
, 'w')
246 sig_file
.write(sig_data
)
249 status_r
, status_w
= os
.pipe()
258 os
.dup2(data
.fileno(), 0)
259 os
.dup2(errors
.fileno(), 2)
260 os
.execlp('gpg', 'gpg', '--no-secmem-warning',
261 # Not all versions support this:
262 #'--max-output', str(1024 * 1024),
264 '--status-fd', str(status_w
),
265 '--verify', sig_name
, '-')
267 traceback
.print_exc()
276 sigs
= _get_sigs_from_gpg_status_stream(status_r
, child
, errors
)
278 os
.lseek(stream
.fileno(), 0, 0)
282 return (stream
, sigs
)
284 def _find_in_path(prog
):
285 for d
in os
.environ
['PATH'].split(':'):
286 path
= os
.path
.join(d
, prog
)
287 if os
.path
.isfile(path
):
291 def check_stream(stream
):
292 """Pass stream through gpg --decrypt to get the data, the error text,
293 and a list of signatures (good or bad). If stream starts with "<?xml "
294 then get the signature from a comment at the end instead (and the returned
295 data is the original stream). stream must be seekable.
296 @note: Stream returned may or may not be the one passed in. Be careful!
297 @return: (data_stream, [Signatures])"""
298 if not _find_in_path('gpg'):
299 raise SafeException("GnuPG is not installed ('gpg' not in $PATH). See http://gnupg.org")
305 start
= stream
.read(6)
307 if start
== "<?xml ":
308 return _check_xml_stream(stream
)
310 os
.lseek(stream
.fileno(), 0, 0)
311 return _check_plain_stream(stream
)
313 def _get_sigs_from_gpg_status_stream(status_r
, child
, errors
):
314 """Read messages from status_r and collect signatures from it.
315 When done, reap 'child'.
316 If there are no signatures, throw SafeException (using errors
317 for the error message if non-empty)."""
320 # Should we error out on bad signatures, even if there's a good
323 for line
in os
.fdopen(status_r
):
324 assert line
.endswith('\n')
325 assert line
.startswith('[GNUPG:] ')
327 split_line
= line
.split(' ')
329 args
= split_line
[1:]
330 if code
== 'VALIDSIG':
331 sigs
.append(ValidSig(args
))
332 elif code
== 'BADSIG':
333 sigs
.append(BadSig(args
))
334 elif code
== 'ERRSIG':
335 sigs
.append(ErrSig(args
))
337 pid
, status
= os
.waitpid(child
, 0)
342 error_messages
= errors
.read().strip()
347 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages
)
349 raise SafeException("No signatures found. No error messages from GPG.")