Typo.
[zeroinstall.git] / zeroinstall / injector / gpg.py
blobb08c1a91367657b1c82b226999e9d09d83653963
1 import os
2 import tempfile
3 import traceback
4 from trust import trust_db
5 from model import SafeException
7 class Signature:
8 status = None
10 def __init__(self, status):
11 self.status = status
13 def is_trusted(self):
14 return False
16 def need_key(self):
17 """Returns the ID of the key that must be downloaded to check this signature."""
18 return None
20 class ValidSig(Signature):
21 FINGERPRINT = 0
23 def __str__(self):
24 return "Valid signature from " + self.status[self.FINGERPRINT]
26 def is_trusted(self):
27 return trust_db.is_trusted(self.status[self.FINGERPRINT])
29 class BadSig(Signature):
30 KEYID = 0
32 def __str__(self):
33 return "BAD signature by " + self.status[self.KEYID] + \
34 " (the message has been tampered with)"
36 class ErrSig(Signature):
37 KEYID = 0
38 ALG = 1
39 RC = -1
41 def __str__(self):
42 msg = "ERROR signature by %s: " % self.status[self.KEYID]
43 rc = int(self.status[self.RC])
44 if rc == 4:
45 msg += "Unknown or unsupported algorithm '%s'" % self.status[self.ALG]
46 elif rc == 9:
47 msg += "Unknown key. Try 'gpg --recv-key %s'" % self.status[self.KEYID]
48 else:
49 msg += "Unknown reason code %d" % rc
50 return msg
52 def need_key(self):
53 rc = int(self.status[self.RC])
54 if rc == 9:
55 return self.status[self.KEYID]
56 return None
58 def import_key(stream):
59 errors = tempfile.TemporaryFile()
61 child = os.fork()
62 if child == 0:
63 # We are the child
64 try:
65 try:
66 os.dup2(stream.fileno(), 0)
67 os.dup2(errors.fileno(), 2)
68 os.execlp('gpg', 'gpg', '--quiet', '--import')
69 except:
70 traceback.print_exc()
71 finally:
72 os._exit(1)
73 assert False
75 pid, status = os.waitpid(child, 0)
76 assert pid == child
78 errors.seek(0)
79 error_messages = errors.read().strip()
80 errors.close()
82 if error_messages:
83 raise SafeException("Errors from 'gpg --import':\n%s" % error_messages)
85 def check_stream(stream):
86 """Pass stream through gpg --decrypt to get the data, the error text,
87 and a list of signatures (good or bad).
88 Returns (data_stream, [Signatures])."""
89 status_r, status_w = os.pipe()
91 data = tempfile.TemporaryFile() # Python2.2 does not support 'prefix'
92 errors = tempfile.TemporaryFile()
94 child = os.fork()
96 if child == 0:
97 # We are the child
98 try:
99 try:
100 os.close(status_r)
101 os.dup2(stream.fileno(), 0)
102 os.dup2(data.fileno(), 1)
103 os.dup2(errors.fileno(), 2)
104 os.execlp('gpg', 'gpg', '--decrypt',
105 # Not all versions support this:
106 #'--max-output', str(1024 * 1024),
107 '--batch',
108 '--status-fd', str(status_w))
109 except:
110 traceback.print_exc()
111 finally:
112 os._exit(1)
113 assert False
115 # We are the parent
116 os.close(status_w)
118 sigs = []
120 # Should we error out on bad signatures, even if there's a good
121 # signature too?
123 for line in os.fdopen(status_r):
124 assert line.endswith('\n')
125 assert line.startswith('[GNUPG:] ')
126 line = line[9:-1]
127 split_line = line.split(' ')
128 code = split_line[0]
129 args = split_line[1:]
130 if code == 'VALIDSIG':
131 sigs.append(ValidSig(args))
132 elif code == 'BADSIG':
133 sigs.append(BadSig(args))
134 elif code == 'ERRSIG':
135 sigs.append(ErrSig(args))
137 pid, status = os.waitpid(child, 0)
138 assert pid == child
140 data.seek(0)
141 errors.seek(0)
143 error_messages = errors.read().strip()
144 errors.close()
146 if error_messages and not sigs:
147 raise SafeException("No signatures found. Errors from GPG:\n%s" % error_messages)
149 return (data, sigs)