2 Python interface to GnuPG.
4 This module is used to invoke GnuPG to check the digital signatures on interfaces.
6 @see: L{iface_cache.PendingFeed}
9 # Copyright (C) 2009, Thomas Leonard
10 # See the README file for details, or visit http://0install.net.
12 from zeroinstall
import _
, logger
18 from zeroinstall
.support
import find_in_path
, basedir
19 from zeroinstall
.injector
.trust
import trust_db
20 from zeroinstall
.injector
.model
import SafeException
23 def _run_gpg(args
, **kwargs
):
25 if _gnupg_options
is None:
26 gpg_path
= os
.environ
.get('ZEROINSTALL_GPG') or find_in_path('gpg') or find_in_path('gpg2') or 'gpg'
27 _gnupg_options
= [gpg_path
, '--no-secmem-warning']
29 if hasattr(os
, 'geteuid') and os
.geteuid() == 0 and 'GNUPGHOME' not in os
.environ
:
30 _gnupg_options
+= ['--homedir', os
.path
.join(basedir
.home
, '.gnupg')]
31 logger
.info(_("Running as root, so setting GnuPG home to %s"), _gnupg_options
[-1])
33 return subprocess
.Popen(_gnupg_options
+ args
, universal_newlines
= True, **kwargs
)
35 class Signature(object):
36 """Abstract base class for signature check results.
37 @ivar status: the raw data returned by GPG
38 @ivar messages: any messages printed by GPG which may be relevant to this signature
43 def __init__(self
, status
):
46 def is_trusted(self
, domain
= None):
47 """Whether this signature is trusted by the user."""
51 """Returns the ID of the key that must be downloaded to check this signature."""
54 class ValidSig(Signature
):
55 """A valid signature check result."""
60 return "Valid signature from " + self
.status
[self
.FINGERPRINT
]
62 def is_trusted(self
, domain
= None):
63 """Asks the L{trust.trust_db}."""
64 return trust_db
.is_trusted(self
.status
[self
.FINGERPRINT
], domain
)
66 def get_timestamp(self
):
67 """Get the time this signature was made."""
68 return int(self
.status
[self
.TIMESTAMP
])
70 fingerprint
= property(lambda self
: self
.status
[self
.FINGERPRINT
])
72 def get_details(self
):
73 """Call 'gpg --list-keys' and return the results split into lines and columns.
75 # Note: GnuPG 2 always uses --fixed-list-mode
76 child
= _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys', self
.fingerprint
], stdout
= subprocess
.PIPE
)
77 cout
, unused
= child
.communicate()
79 logger
.info(_("GPG exited with code %d") % child
.returncode
)
81 for line
in cout
.split('\n'):
82 details
.append(line
.split(':'))
85 class BadSig(Signature
):
86 """A bad signature (doesn't match the message)."""
90 return _("BAD signature by %s (the message has been tampered with)") \
91 % self
.status
[self
.KEYID
]
93 class ErrSig(Signature
):
94 """Error while checking a signature."""
100 msg
= _("ERROR signature by %s: ") % self
.status
[self
.KEYID
]
101 rc
= int(self
.status
[self
.RC
])
103 msg
+= _("Unknown or unsupported algorithm '%s'") % self
.status
[self
.ALG
]
105 msg
+= _("Unknown key. Try 'gpg --recv-key %s'") % self
.status
[self
.KEYID
]
107 msg
+= _("Unknown reason code %d") % rc
111 rc
= int(self
.status
[self
.RC
])
113 return self
.status
[self
.KEYID
]
119 @param fingerprint: the fingerprint of the key
120 @type fingerprint: str
121 @ivar name: a short name for the key, extracted from the full name
124 def __init__(self
, fingerprint
):
125 self
.fingerprint
= fingerprint
126 self
.name
= '(unknown)'
128 def get_short_name(self
):
129 return self
.name
.split(' (', 1)[0].split(' <', 1)[0]
131 def load_keys(fingerprints
):
132 """Load a set of keys at once.
133 This is much more efficient than making individual calls to L{load_key}.
134 @return: a list of loaded keys, indexed by fingerprint
135 @rtype: {str: L{Key}}
141 # Otherwise GnuPG returns everything...
142 if not fingerprints
: return keys
144 for fp
in fingerprints
:
150 child
= _run_gpg(['--fixed-list-mode', '--with-colons', '--list-keys',
151 '--with-fingerprint', '--with-fingerprint'] + fingerprints
, stdout
= subprocess
.PIPE
)
153 for line
in child
.stdout
:
154 if line
.startswith('pub:'):
157 if line
.startswith('fpr:'):
158 current_fpr
= line
.split(':')[9]
159 if current_fpr
in keys
and current_uid
:
160 # This is probably a subordinate key, where the fingerprint
161 # comes after the uid, not before. Note: we assume the subkey is
162 # cross-certified, as recent always ones are.
164 keys
[current_fpr
].name
= codecs
.decode(current_uid
, 'utf-8')
166 logger
.warn("Not UTF-8: %s", current_uid
)
167 keys
[current_fpr
].name
= current_uid
168 if line
.startswith('uid:'):
169 assert current_fpr
is not None
170 # Only take primary UID
171 if current_uid
: continue
172 parts
= line
.split(':')
173 current_uid
= parts
[9]
174 if current_fpr
in keys
:
175 keys
[current_fpr
].name
= current_uid
180 logger
.warn(_("gpg --list-keys failed with exit code %d") % child
.returncode
)
184 def load_key(fingerprint
):
185 """Query gpg for information about this key.
189 return load_keys([fingerprint
])[fingerprint
]
191 def import_key(stream
):
192 """Run C{gpg --import} with this stream as stdin."""
193 with tempfile
.TemporaryFile(mode
= 'w+t') as errors
:
194 child
= _run_gpg(['--quiet', '--import', '--batch'],
195 stdin
= stream
, stderr
= errors
)
197 status
= child
.wait()
200 error_messages
= errors
.read().strip()
204 raise SafeException(_("Errors from 'gpg --import':\n%s") % error_messages
)
206 raise SafeException(_("Non-zero exit code %d from 'gpg --import'") % status
)
208 logger
.warn(_("Warnings from 'gpg --import':\n%s") % error_messages
)
210 def _check_xml_stream(stream
):
211 xml_comment_start
= b
'<!-- Base64 Signature'
213 data_to_check
= stream
.read()
215 last_comment
= data_to_check
.rfind(b
'\n' + xml_comment_start
)
217 raise SafeException(_("No signature block in XML. Maybe this file isn't signed?"))
218 last_comment
+= 1 # Include new-line in data
220 # Copy the file to 'data', without the signature
221 # Copy the signature to 'sig'
223 with tempfile
.TemporaryFile(mode
= 'w+b') as data
:
224 data
.write(data_to_check
[:last_comment
])
226 os
.lseek(data
.fileno(), 0, 0)
228 with tempfile
.TemporaryFile('w+t') as errors
:
229 sig_lines
= data_to_check
[last_comment
:].split(b
'\n')
230 if sig_lines
[0].strip() != xml_comment_start
:
231 raise SafeException(_('Bad signature block: extra data on comment line'))
232 while sig_lines
and not sig_lines
[-1].strip():
234 if sig_lines
[-1].strip() != b
'-->':
235 raise SafeException(_('Bad signature block: last line is not end-of-comment'))
236 sig_data
= b
'\n'.join(sig_lines
[1:-1])
238 if re
.match(b
'^[ A-Za-z0-9+/=\n]+$', sig_data
) is None:
239 raise SafeException(_("Invalid characters found in base 64 encoded signature"))
241 if hasattr(base64
, 'decodebytes'):
242 sig_data
= base64
.decodebytes(sig_data
) # Python 3
244 sig_data
= base64
.decodestring(sig_data
) # Python 2
245 except Exception as ex
:
246 raise SafeException(_("Invalid base 64 encoded signature: %s") % str(ex
))
248 with tempfile
.NamedTemporaryFile(prefix
= 'injector-sig-', mode
= 'wb', delete
= False) as sig_file
:
249 sig_file
.write(sig_data
)
252 # Note: Should ideally close status_r in the child, but we want to support Windows too
253 child
= _run_gpg([# Not all versions support this:
254 #'--max-output', str(1024 * 1024),
256 # Windows GPG can only cope with "1" here
258 # Don't try to download missing keys; we'll do that
259 '--keyserver-options', 'no-auto-key-retrieve',
260 '--verify', sig_file
.name
, '-'],
262 stdout
= subprocess
.PIPE
,
266 sigs
= _get_sigs_from_gpg_status_stream(child
.stdout
, child
, errors
)
268 os
.lseek(stream
.fileno(), 0, 0)
274 os
.unlink(sig_file
.name
)
275 return (stream
, sigs
)
277 def check_stream(stream
):
278 """Pass stream through gpg --decrypt to get the data, the error text,
279 and a list of signatures (good or bad). If stream starts with "<?xml "
280 then get the signature from a comment at the end instead (and the returned
281 data is the original stream). stream must be seekable.
282 @note: Stream returned may or may not be the one passed in. Be careful!
283 @return: (data_stream, [Signatures])"""
287 start
= stream
.read(6)
289 if start
== b
"<?xml ":
290 return _check_xml_stream(stream
)
291 elif start
== b
'-----B':
292 raise SafeException(_("Plain GPG-signed feeds no longer supported"))
294 raise SafeException(_("This is not a Zero Install feed! It should be an XML document, but it starts:\n%s") % repr(stream
.read(120)))
296 def _get_sigs_from_gpg_status_stream(status_r
, child
, errors
):
297 """Read messages from status_r and collect signatures from it.
298 When done, reap 'child'.
299 If there are no signatures, throw SafeException (using errors
300 for the error message if non-empty)."""
303 # Should we error out on bad signatures, even if there's a good
306 for line
in status_r
:
307 assert line
.endswith('\n')
308 if not line
.startswith('[GNUPG:] '):
309 # The docs says every line starts with this, but if auto-key-retrieve
310 # is on then they might not. See bug #3420548
311 logger
.warn("Invalid output from GnuPG: %r", line
)
315 split_line
= line
.split(' ')
317 args
= split_line
[1:]
318 if code
== 'VALIDSIG':
319 sigs
.append(ValidSig(args
))
320 elif code
== 'BADSIG':
321 sigs
.append(BadSig(args
))
322 elif code
== 'ERRSIG':
323 sigs
.append(ErrSig(args
))
327 error_messages
= errors
.read().strip()
331 raise SafeException(_("No signatures found. Errors from GPG:\n%s") % error_messages
)
333 raise SafeException(_("No signatures found. No error messages from GPG."))
335 # Attach the warnings to all the signatures, in case they're useful.
337 s
.messages
= error_messages