Warn about tar archives with a leading "./" in the name.
[0publish-gui.git] / signing.py
blob96c5e572a8404fce3aba2ccc70bb5cfd7ae3f816
1 from zeroinstall import SafeException
2 from zeroinstall.injector import gpg
3 import tempfile, os, base64, shutil, signal
4 import subprocess
5 from rox import tasks, g
7 umask = os.umask(0)
8 os.umask(umask)
10 class LineBuffer:
11 def __init__(self):
12 self.data = ''
14 def add(self, new):
15 assert new
16 self.data += new
18 def __iter__(self):
19 while '\n' in self.data:
20 command, self.data = self.data.split('\n', 1)
21 yield command
23 def _io_callback(src, cond, blocker):
24 blocker.trigger()
25 return False
27 def get_secret_keys():
28 child = subprocess.Popen(('gpg', '--list-secret-keys', '--with-colons', '--with-fingerprint'),
29 stdout = subprocess.PIPE)
30 stdout, _ = child.communicate()
31 status = child.wait()
32 if status:
33 raise Exception("GPG failed with exit code %d" % status)
34 # First, collect fingerprints for available secret keys
35 keys = []
36 for line in stdout.split('\n'):
37 line = line.split(':')
38 if line[0] == 'fpr':
39 keys.append([line[9], None])
40 # When listing secret keys, the identity shown may not be the primary identity as selected by
41 # the user or shown when verifying a signature. However, the primary identity can be obtained
42 # by listing the accompanying public key.
43 loaded_keys = gpg.load_keys([k[0] for k in keys])
44 for key in keys:
45 key[1] = "{name} - {id}".format(
46 name = loaded_keys[key[0]].name,
47 id = key[0][-8:])
48 return keys
50 def check_signature(path):
51 data = file(path).read()
52 xml_comment = data.rfind('\n<!-- Base64 Signature')
53 if xml_comment >= 0:
54 data_stream, sigs = gpg.check_stream(file(path))
55 sign_fn = sign_xml
56 data = data[:xml_comment + 1]
57 data_stream.close()
58 elif data.startswith('-----BEGIN'):
59 data_stream, sigs = gpg.check_stream(file(path))
60 sign_fn = sign_xml # Don't support saving as plain
61 data = data_stream.read()
62 else:
63 return data, sign_unsigned, None
64 for sig in sigs:
65 if isinstance(sig, gpg.ValidSig):
66 return data, sign_fn, sig.fingerprint
67 error = "ERROR: No valid signatures found!\n"
68 for sig in sigs:
69 error += "\nGot: %s" % sig
70 error += '\n\nTo edit it anyway, remove the signature using a text editor.'
71 raise Exception(error)
73 def write_tmp(path, data):
74 """Create a temporary file in the same directory as 'path' and write data to it."""
75 fd, tmp = tempfile.mkstemp(prefix = 'tmp-', dir = os.path.dirname(path))
76 stream = os.fdopen(fd, 'w')
77 stream.write(data)
78 stream.close()
79 os.chmod(tmp, 0644 &~umask)
80 return tmp
82 def run_gpg(default_key, *arguments):
83 arguments = list(arguments)
84 if default_key is not None:
85 arguments = ['--default-key', default_key] + arguments
86 arguments.insert(0, 'gpg')
87 if os.spawnvp(os.P_WAIT, 'gpg', arguments):
88 raise SafeException("Command '%s' failed" % arguments)
90 def sign_unsigned(path, data, key, callback):
91 os.rename(write_tmp(path, data), path)
92 if callback: callback()
94 def sign_xml(path, data, key, callback):
95 import main
96 wTree = g.glade.XML(main.gladefile, 'get_passphrase')
97 box = wTree.get_widget('get_passphrase')
98 box.set_default_response(g.RESPONSE_OK)
99 entry = wTree.get_widget('passphrase')
101 buffer = LineBuffer()
103 killed = False
104 error = False
105 tmp = None
106 r, w = os.pipe()
107 try:
108 def setup_child():
109 os.close(r)
111 tmp = write_tmp(path, data)
112 sigtmp = tmp + '.sig'
114 agent_info = os.environ.get("GPG_AGENT_INFO", None)
115 child = subprocess.Popen(('gpg', '--default-key', key,
116 '--detach-sign', '--status-fd', str(w),
117 '--command-fd', '0',
118 '--no-tty',
119 '--output', sigtmp,
120 '--use-agent',
121 '-q',
122 tmp),
123 preexec_fn = setup_child,
124 stdin = subprocess.PIPE)
126 os.close(w)
127 w = None
128 while True:
129 input = tasks.InputBlocker(r)
130 yield input
131 msg = os.read(r, 100)
132 if not msg: break
133 buffer.add(msg)
134 for command in buffer:
135 if command.startswith('[GNUPG:] NEED_PASSPHRASE ') and not agent_info:
136 entry.set_text('')
137 box.present()
138 resp = box.run()
139 box.hide()
140 if resp == g.RESPONSE_OK:
141 child.stdin.write(entry.get_text() + '\n')
142 child.stdin.flush()
143 else:
144 os.kill(child.pid, signal.SIGTERM)
145 killed = True
147 status = child.wait()
148 if status:
149 raise Exception("GPG failed with exit code %d" % status)
150 except:
151 # No generator finally blocks in Python 2.4...
152 error = True
154 if r is not None: os.close(r)
155 if w is not None: os.close(w)
156 if tmp is not None: os.unlink(tmp)
158 if killed: return
159 if error: raise
161 encoded = base64.encodestring(file(sigtmp).read())
162 os.unlink(sigtmp)
163 sig = "<!-- Base64 Signature\n" + encoded + "\n-->\n"
164 os.rename(write_tmp(path, data + sig), path)
166 if callback: callback()
168 def export_key(dir, fingerprint):
169 assert fingerprint is not None
170 # Convert fingerprint to key ID
171 stream = os.popen('gpg --with-colons --list-keys %s' % fingerprint)
172 try:
173 keyID = None
174 for line in stream:
175 parts = line.split(':')
176 if parts[0] == 'pub':
177 if keyID:
178 raise Exception('Two key IDs returned from GPG!')
179 keyID = parts[4]
180 finally:
181 stream.close()
182 key_file = os.path.join(dir, keyID + '.gpg')
183 if os.path.isfile(key_file):
184 return None
185 key_stream = file(key_file, 'w')
186 stream = os.popen("gpg -a --export '%s'" % fingerprint)
187 shutil.copyfileobj(stream, key_stream)
188 stream.close()
189 key_stream.close()
190 return key_file