Don't try to scan implementation contents for distribution packages.
[0publish-gui.git] / signing.py
blobba8b7daee85cbb6584a5b7fa86abaa30f410042b
1 from zeroinstall import SafeException
2 from zeroinstall.injector import gpg
3 import tempfile, os, base64, sys, shutil, signal
4 import subprocess
5 from rox import tasks, g
6 import gobject
8 umask = os.umask(0)
9 os.umask(umask)
11 class LineBuffer:
12 def __init__(self):
13 self.data = ''
15 def add(self, new):
16 assert new
17 self.data += new
19 def __iter__(self):
20 while '\n' in self.data:
21 command, self.data = self.data.split('\n', 1)
22 yield command
24 def _io_callback(src, cond, blocker):
25 blocker.trigger()
26 return False
28 # (version in ROX-Lib < 2.0.4 is buggy; missing IO_HUP)
29 class InputBlocker(tasks.Blocker):
30 """Triggers when os.read(stream) would not block."""
31 _tag = None
32 _stream = None
33 def __init__(self, stream):
34 tasks.Blocker.__init__(self)
35 self._stream = stream
37 def add_task(self, task):
38 tasks.Blocker.add_task(self, task)
39 if self._tag is None:
40 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP,
41 _io_callback, self)
43 def remove_task(self, task):
44 tasks.Blocker.remove_task(self, task)
45 if not self._rox_lib_tasks:
46 gobject.source_remove(self._tag)
47 self._tag = None
49 def get_secret_keys():
50 child = subprocess.Popen(('gpg', '--list-secret-keys', '--with-colons', '--with-fingerprint'),
51 stdout = subprocess.PIPE)
52 stdout, _ = child.communicate()
53 status = child.wait()
54 if status:
55 raise Exception("GPG failed with exit code %d" % status)
56 # First, collect fingerprints for available secret keys
57 keys = []
58 for line in stdout.split('\n'):
59 line = line.split(':')
60 if line[0] == 'fpr':
61 keys.append([line[9], None])
62 # When listing secret keys, the identity show may not be the primary identity as selected by
63 # the user or shown when verifying a signature. However, the primary identity can be obtained
64 # by listing the accompanying public key.
65 for key in keys:
66 child = subprocess.Popen(('gpg', '--list-keys', '--with-colons',
67 '--fingerprint', key[0]),
68 stdout = subprocess.PIPE)
69 stdout, _ = child.communicate()
70 status = child.wait()
71 if status:
72 raise Exception("GPG failed with exit code %d" % status)
73 for line in stdout.split('\n'):
74 line = line.split(':')
75 if line[0] == 'pub':
76 key[1] = line[9]
77 break
78 return keys
80 def check_signature(path):
81 data = file(path).read()
82 xml_comment = data.rfind('\n<!-- Base64 Signature')
83 if xml_comment >= 0:
84 data_stream, sigs = gpg.check_stream(file(path))
85 sign_fn = sign_xml
86 data = data[:xml_comment + 1]
87 data_stream.close()
88 elif data.startswith('-----BEGIN'):
89 data_stream, sigs = gpg.check_stream(file(path))
90 sign_fn = sign_xml # Don't support saving as plain
91 data = data_stream.read()
92 else:
93 return data, sign_unsigned, None
94 for sig in sigs:
95 if isinstance(sig, gpg.ValidSig):
96 return data, sign_fn, sig.fingerprint
97 error = "ERROR: No valid signatures found!\n"
98 for sig in sigs:
99 error += "\nGot: %s" % sig
100 error += '\n\nTo edit it anyway, remove the signature using a text editor.'
101 raise Exception(error)
103 def write_tmp(path, data):
104 """Create a temporary file in the same directory as 'path' and write data to it."""
105 fd, tmp = tempfile.mkstemp(prefix = 'tmp-', dir = os.path.dirname(path))
106 stream = os.fdopen(fd, 'w')
107 stream.write(data)
108 stream.close()
109 os.chmod(tmp, 0644 &~umask)
110 return tmp
112 def run_gpg(default_key, *arguments):
113 arguments = list(arguments)
114 if default_key is not None:
115 arguments = ['--default-key', default_key] + arguments
116 arguments.insert(0, 'gpg')
117 if os.spawnvp(os.P_WAIT, 'gpg', arguments):
118 raise SafeException("Command '%s' failed" % arguments)
120 def sign_unsigned(path, data, key, callback):
121 os.rename(write_tmp(path, data), path)
122 if callback: callback()
124 def sign_xml(path, data, key, callback):
125 import main
126 wTree = g.glade.XML(main.gladefile, 'get_passphrase')
127 box = wTree.get_widget('get_passphrase')
128 box.set_default_response(g.RESPONSE_OK)
129 entry = wTree.get_widget('passphrase')
131 buffer = LineBuffer()
133 killed = False
134 error = False
135 tmp = None
136 r, w = os.pipe()
137 try:
138 def setup_child():
139 os.close(r)
141 tmp = write_tmp(path, data)
142 sigtmp = tmp + '.sig'
144 hasAgent = os.environ["GPG_AGENT_INFO"]
145 child = subprocess.Popen(('gpg', '--default-key', key,
146 '--detach-sign', '--status-fd', str(w),
147 '--command-fd', '0',
148 '--no-tty',
149 '--output', sigtmp,
150 '--use-agent',
151 '-q',
152 tmp),
153 preexec_fn = setup_child,
154 stdin = subprocess.PIPE)
156 os.close(w)
157 w = None
158 while True:
159 input = InputBlocker(r)
160 yield input
161 msg = os.read(r, 100)
162 if not msg: break
163 buffer.add(msg)
164 for command in buffer:
165 if command.startswith('[GNUPG:] NEED_PASSPHRASE ') and not hasAgent:
166 entry.set_text('')
167 box.present()
168 resp = box.run()
169 box.hide()
170 if resp == g.RESPONSE_OK:
171 child.stdin.write(entry.get_text() + '\n')
172 child.stdin.flush()
173 else:
174 os.kill(child.pid, signal.SIGTERM)
175 killed = True
177 status = child.wait()
178 if status:
179 raise Exception("GPG failed with exit code %d" % status)
180 except:
181 # No generator finally blocks in Python 2.4...
182 error = True
184 if r is not None: os.close(r)
185 if w is not None: os.close(w)
186 if tmp is not None: os.unlink(tmp)
188 if killed: return
189 if error: raise
191 encoded = base64.encodestring(file(sigtmp).read())
192 os.unlink(sigtmp)
193 sig = "<!-- Base64 Signature\n" + encoded + "\n-->\n"
194 os.rename(write_tmp(path, data + sig), path)
196 if callback: callback()
198 def export_key(dir, fingerprint):
199 assert fingerprint is not None
200 # Convert fingerprint to key ID
201 stream = os.popen('gpg --with-colons --list-keys %s' % fingerprint)
202 try:
203 keyID = None
204 for line in stream:
205 parts = line.split(':')
206 if parts[0] == 'pub':
207 if keyID:
208 raise Exception('Two key IDs returned from GPG!')
209 keyID = parts[4]
210 finally:
211 stream.close()
212 key_file = os.path.join(dir, keyID + '.gpg')
213 if os.path.isfile(key_file):
214 return None
215 key_stream = file(key_file, 'w')
216 stream = os.popen("gpg -a --export '%s'" % fingerprint)
217 shutil.copyfileobj(stream, key_stream)
218 stream.close()
219 key_stream.close()
220 return key_file