Fixed version regex
[0publish-gui.git] / signing.py
blob9a51172462bd79f3944b079224dec89fbcb05898
1 from zeroinstall import SafeException
2 from zeroinstall.injector import gpg
3 import tempfile, os, base64, shutil, signal
4 import subprocess
5 from rox import tasks, g
7 umask = os.umask(0)
8 os.umask(umask)
10 class LineBuffer:
11 def __init__(self):
12 self.data = ''
14 def add(self, new):
15 assert new
16 self.data += new
18 def __iter__(self):
19 while '\n' in self.data:
20 command, self.data = self.data.split('\n', 1)
21 yield command
23 def _io_callback(src, cond, blocker):
24 blocker.trigger()
25 return False
27 def get_secret_keys():
28 child = subprocess.Popen(('gpg', '--list-secret-keys', '--with-colons', '--with-fingerprint'),
29 stdout = subprocess.PIPE)
30 stdout, _ = child.communicate()
31 status = child.wait()
32 if status:
33 raise Exception("GPG failed with exit code %d" % status)
34 # First, collect fingerprints for available secret keys
35 keys = []
36 for line in stdout.split('\n'):
37 line = line.split(':')
38 if line[0] == 'fpr':
39 keys.append([line[9], None])
40 # When listing secret keys, the identity show may not be the primary identity as selected by
41 # the user or shown when verifying a signature. However, the primary identity can be obtained
42 # by listing the accompanying public key.
43 loaded_keys = gpg.load_keys([k[0] for k in keys])
44 for key in keys:
45 key[1] = loaded_keys[key[0]].name
46 return keys
48 def check_signature(path):
49 data = file(path).read()
50 xml_comment = data.rfind('\n<!-- Base64 Signature')
51 if xml_comment >= 0:
52 data_stream, sigs = gpg.check_stream(file(path))
53 sign_fn = sign_xml
54 data = data[:xml_comment + 1]
55 data_stream.close()
56 elif data.startswith('-----BEGIN'):
57 data_stream, sigs = gpg.check_stream(file(path))
58 sign_fn = sign_xml # Don't support saving as plain
59 data = data_stream.read()
60 else:
61 return data, sign_unsigned, None
62 for sig in sigs:
63 if isinstance(sig, gpg.ValidSig):
64 return data, sign_fn, sig.fingerprint
65 error = "ERROR: No valid signatures found!\n"
66 for sig in sigs:
67 error += "\nGot: %s" % sig
68 error += '\n\nTo edit it anyway, remove the signature using a text editor.'
69 raise Exception(error)
71 def write_tmp(path, data):
72 """Create a temporary file in the same directory as 'path' and write data to it."""
73 fd, tmp = tempfile.mkstemp(prefix = 'tmp-', dir = os.path.dirname(path))
74 stream = os.fdopen(fd, 'w')
75 stream.write(data)
76 stream.close()
77 os.chmod(tmp, 0644 &~umask)
78 return tmp
80 def run_gpg(default_key, *arguments):
81 arguments = list(arguments)
82 if default_key is not None:
83 arguments = ['--default-key', default_key] + arguments
84 arguments.insert(0, 'gpg')
85 if os.spawnvp(os.P_WAIT, 'gpg', arguments):
86 raise SafeException("Command '%s' failed" % arguments)
88 def sign_unsigned(path, data, key, callback):
89 os.rename(write_tmp(path, data), path)
90 if callback: callback()
92 def sign_xml(path, data, key, callback):
93 import main
94 wTree = g.glade.XML(main.gladefile, 'get_passphrase')
95 box = wTree.get_widget('get_passphrase')
96 box.set_default_response(g.RESPONSE_OK)
97 entry = wTree.get_widget('passphrase')
99 buffer = LineBuffer()
101 killed = False
102 error = False
103 tmp = None
104 r, w = os.pipe()
105 try:
106 def setup_child():
107 os.close(r)
109 tmp = write_tmp(path, data)
110 sigtmp = tmp + '.sig'
112 agent_info = os.environ.get("GPG_AGENT_INFO", None)
113 child = subprocess.Popen(('gpg', '--default-key', key,
114 '--detach-sign', '--status-fd', str(w),
115 '--command-fd', '0',
116 '--no-tty',
117 '--output', sigtmp,
118 '--use-agent',
119 '-q',
120 tmp),
121 preexec_fn = setup_child,
122 stdin = subprocess.PIPE)
124 os.close(w)
125 w = None
126 while True:
127 input = tasks.InputBlocker(r)
128 yield input
129 msg = os.read(r, 100)
130 if not msg: break
131 buffer.add(msg)
132 for command in buffer:
133 if command.startswith('[GNUPG:] NEED_PASSPHRASE ') and not agent_info:
134 entry.set_text('')
135 box.present()
136 resp = box.run()
137 box.hide()
138 if resp == g.RESPONSE_OK:
139 child.stdin.write(entry.get_text() + '\n')
140 child.stdin.flush()
141 else:
142 os.kill(child.pid, signal.SIGTERM)
143 killed = True
145 status = child.wait()
146 if status:
147 raise Exception("GPG failed with exit code %d" % status)
148 except:
149 # No generator finally blocks in Python 2.4...
150 error = True
152 if r is not None: os.close(r)
153 if w is not None: os.close(w)
154 if tmp is not None: os.unlink(tmp)
156 if killed: return
157 if error: raise
159 encoded = base64.encodestring(file(sigtmp).read())
160 os.unlink(sigtmp)
161 sig = "<!-- Base64 Signature\n" + encoded + "\n-->\n"
162 os.rename(write_tmp(path, data + sig), path)
164 if callback: callback()
166 def export_key(dir, fingerprint):
167 assert fingerprint is not None
168 # Convert fingerprint to key ID
169 stream = os.popen('gpg --with-colons --list-keys %s' % fingerprint)
170 try:
171 keyID = None
172 for line in stream:
173 parts = line.split(':')
174 if parts[0] == 'pub':
175 if keyID:
176 raise Exception('Two key IDs returned from GPG!')
177 keyID = parts[4]
178 finally:
179 stream.close()
180 key_file = os.path.join(dir, keyID + '.gpg')
181 if os.path.isfile(key_file):
182 return None
183 key_stream = file(key_file, 'w')
184 stream = os.popen("gpg -a --export '%s'" % fingerprint)
185 shutil.copyfileobj(stream, key_stream)
186 stream.close()
187 key_stream.close()
188 return key_file