Also get GPG keys using the mirror.
[zeroinstall/zeroinstall-mseaborn.git] / tests / testdownload.py
blobf25e794b8c9461c92c8ae0294600fc82cde93025
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, DEBUG, INFO, WARN, ERROR
8 sys.path.insert(0, '..')
10 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler
11 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
12 from zeroinstall.support import basedir
13 import data
15 import server
17 class Reply:
18 def __init__(self, reply):
19 self.reply = reply
21 def readline(self):
22 return self.reply
24 class DummyHandler(handler.Handler):
25 __slots__ = ['ex', 'tb']
27 def __init__(self):
28 handler.Handler.__init__(self)
29 self.ex = None
31 def wait_for_blocker(self, blocker):
32 self.ex = None
33 handler.Handler.wait_for_blocker(self, blocker)
34 if self.ex:
35 raise self.ex, None, self.tb
37 def report_error(self, ex, tb = None):
38 assert self.ex is None, self.ex
39 self.ex = ex
40 self.tb = tb
42 #import traceback
43 #traceback.print_exc()
45 class TestDownload(BaseTest):
46 def setUp(self):
47 BaseTest.setUp(self)
49 stream = tempfile.TemporaryFile()
50 stream.write(data.thomas_key)
51 stream.seek(0)
52 gpg.import_key(stream)
53 self.child = None
55 trust.trust_db.watchers = []
57 def tearDown(self):
58 BaseTest.tearDown(self)
59 if self.child is not None:
60 os.kill(self.child, signal.SIGTERM)
61 os.waitpid(self.child, 0)
62 self.child = None
64 def testRejectKey(self):
65 old_out = sys.stdout
66 try:
67 sys.stdout = StringIO()
68 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
69 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
70 handler = DummyHandler())
71 assert policy.need_download()
72 sys.stdin = Reply("N\n")
73 try:
74 policy.download_and_execute(['Hello'])
75 assert 0
76 except model.SafeException, ex:
77 if "Not signed with a trusted key" not in str(ex):
78 raise ex
79 finally:
80 sys.stdout = old_out
82 def testRejectKeyXML(self):
83 old_out = sys.stdout
84 try:
85 sys.stdout = StringIO()
86 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
87 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
88 handler = DummyHandler())
89 assert policy.need_download()
90 sys.stdin = Reply("N\n")
91 try:
92 policy.download_and_execute(['Hello'])
93 assert 0
94 except model.SafeException, ex:
95 if "Not signed with a trusted key" not in str(ex):
96 raise
97 finally:
98 sys.stdout = old_out
100 def testImport(self):
101 old_out = sys.stdout
102 try:
103 from zeroinstall.injector import cli
104 import logging
106 rootLogger = getLogger()
107 rootLogger.disabled = True
108 try:
109 try:
110 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
111 assert 0
112 except model.SafeException, ex:
113 assert 'NO-SUCH-FILE' in str(ex)
114 finally:
115 rootLogger.disabled = False
116 rootLogger.setLevel(WARN)
118 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
119 self.assertEquals(0, len(hello.implementations))
121 sys.stdout = StringIO()
122 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
123 sys.stdin = Reply("Y\n")
125 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
126 cli.main(['--import', 'Hello'])
127 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
129 # Check we imported the interface after trusting the key
130 reader.update_from_cache(hello)
131 self.assertEquals(1, len(hello.implementations))
133 # Shouldn't need to prompt the second time
134 sys.stdin = None
135 cli.main(['--import', 'Hello'])
136 finally:
137 sys.stdout = old_out
139 def testAcceptKey(self):
140 old_out = sys.stdout
141 try:
142 sys.stdout = StringIO()
143 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
144 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
145 handler = DummyHandler())
146 assert policy.need_download()
147 sys.stdin = Reply("Y\n")
148 try:
149 policy.download_and_execute(['Hello'], main = 'Missing')
150 assert 0
151 except model.SafeException, ex:
152 if "HelloWorld/Missing" not in str(ex):
153 raise ex
154 finally:
155 sys.stdout = old_out
157 def testRecipe(self):
158 old_out = sys.stdout
159 try:
160 sys.stdout = StringIO()
161 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
162 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
163 try:
164 policy.download_and_execute([])
165 assert False
166 except model.SafeException, ex:
167 if "HelloWorld/Missing" not in str(ex):
168 raise ex
169 finally:
170 sys.stdout = old_out
172 def testSymlink(self):
173 old_out = sys.stdout
174 try:
175 sys.stdout = StringIO()
176 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
177 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
178 handler = DummyHandler())
179 try:
180 policy.download_and_execute([])
181 assert False
182 except model.SafeException, ex:
183 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
184 raise ex
185 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
186 finally:
187 sys.stdout = old_out
189 def testAutopackage(self):
190 old_out = sys.stdout
191 try:
192 sys.stdout = StringIO()
193 self.child = server.handle_requests('HelloWorld.autopackage')
194 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
195 try:
196 policy.download_and_execute([])
197 assert False
198 except model.SafeException, ex:
199 if "HelloWorld/Missing" not in str(ex):
200 raise
201 finally:
202 sys.stdout = old_out
204 def testRecipeFailure(self):
205 old_out = sys.stdout
206 try:
207 sys.stdout = StringIO()
208 self.child = server.handle_requests('*')
209 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
210 handler = DummyHandler())
211 try:
212 policy.download_and_execute([])
213 assert False
214 except download.DownloadError, ex:
215 if "Connection" not in str(ex):
216 raise
217 finally:
218 sys.stdout = old_out
220 def testMirrors(self):
221 old_out = sys.stdout
222 try:
223 sys.stdout = StringIO()
224 getLogger().setLevel(ERROR)
225 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
226 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
227 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
228 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
230 refreshed = policy.solve_with_downloads([])
231 errors = policy.handler.wait_for_blocker(refreshed)
232 if errors:
233 raise model.SafeException("Errors during download: " + '\n'.join(errors))
234 assert policy.ready
235 finally:
236 sys.stdout = old_out
239 suite = unittest.makeSuite(TestDownload)
240 if __name__ == '__main__':
241 sys.argv.append('-v')
242 unittest.main()