Generalise mirror fallback code.
[zeroinstall/zeroinstall-mseaborn.git] / tests / testdownload.py
blob742777c81817136444ce0c52818bec1f242d86eb
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, DEBUG, INFO, WARN, ERROR
8 sys.path.insert(0, '..')
10 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler
11 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
12 from zeroinstall.support import basedir
13 import data
15 import server
17 class Reply:
18 def __init__(self, reply):
19 self.reply = reply
21 def readline(self):
22 return self.reply
24 class DummyHandler(handler.Handler):
25 __slots__ = ['ex']
27 def __init__(self):
28 handler.Handler.__init__(self)
29 self.ex = None
31 def wait_for_blocker(self, blocker):
32 self.ex = None
33 handler.Handler.wait_for_blocker(self, blocker)
34 if self.ex:
35 raise self.ex
37 def report_error(self, ex, tb = None):
38 assert self.ex is None, self.ex
39 self.ex = ex
41 #import traceback
42 #traceback.print_exc()
44 class TestDownload(BaseTest):
45 def setUp(self):
46 BaseTest.setUp(self)
48 stream = tempfile.TemporaryFile()
49 stream.write(data.thomas_key)
50 stream.seek(0)
51 gpg.import_key(stream)
52 self.child = None
54 trust.trust_db.watchers = []
56 def tearDown(self):
57 BaseTest.tearDown(self)
58 if self.child is not None:
59 os.kill(self.child, signal.SIGTERM)
60 os.waitpid(self.child, 0)
61 self.child = None
63 def testRejectKey(self):
64 old_out = sys.stdout
65 try:
66 sys.stdout = StringIO()
67 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
68 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
69 handler = DummyHandler())
70 assert policy.need_download()
71 sys.stdin = Reply("N\n")
72 try:
73 policy.download_and_execute(['Hello'])
74 assert 0
75 except model.SafeException, ex:
76 if "Not signed with a trusted key" not in str(ex):
77 raise ex
78 finally:
79 sys.stdout = old_out
81 def testRejectKeyXML(self):
82 old_out = sys.stdout
83 try:
84 sys.stdout = StringIO()
85 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
86 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
87 handler = DummyHandler())
88 assert policy.need_download()
89 sys.stdin = Reply("N\n")
90 try:
91 policy.download_and_execute(['Hello'])
92 assert 0
93 except model.SafeException, ex:
94 if "Not signed with a trusted key" not in str(ex):
95 raise
96 finally:
97 sys.stdout = old_out
99 def testImport(self):
100 old_out = sys.stdout
101 try:
102 from zeroinstall.injector import cli
103 import logging
105 rootLogger = getLogger()
106 rootLogger.disabled = True
107 try:
108 try:
109 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
110 assert 0
111 except model.SafeException, ex:
112 assert 'NO-SUCH-FILE' in str(ex)
113 finally:
114 rootLogger.disabled = False
115 rootLogger.setLevel(WARN)
117 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
118 self.assertEquals(0, len(hello.implementations))
120 sys.stdout = StringIO()
121 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
122 sys.stdin = Reply("Y\n")
124 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
125 cli.main(['--import', 'Hello'])
126 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
128 # Check we imported the interface after trusting the key
129 reader.update_from_cache(hello)
130 self.assertEquals(1, len(hello.implementations))
132 # Shouldn't need to prompt the second time
133 sys.stdin = None
134 cli.main(['--import', 'Hello'])
135 finally:
136 sys.stdout = old_out
138 def testAcceptKey(self):
139 old_out = sys.stdout
140 try:
141 sys.stdout = StringIO()
142 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
143 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
144 handler = DummyHandler())
145 assert policy.need_download()
146 sys.stdin = Reply("Y\n")
147 try:
148 policy.download_and_execute(['Hello'], main = 'Missing')
149 assert 0
150 except model.SafeException, ex:
151 if "HelloWorld/Missing" not in str(ex):
152 raise ex
153 finally:
154 sys.stdout = old_out
156 def testRecipe(self):
157 old_out = sys.stdout
158 try:
159 sys.stdout = StringIO()
160 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
161 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
162 try:
163 policy.download_and_execute([])
164 assert False
165 except model.SafeException, ex:
166 if "HelloWorld/Missing" not in str(ex):
167 raise ex
168 finally:
169 sys.stdout = old_out
171 def testSymlink(self):
172 old_out = sys.stdout
173 try:
174 sys.stdout = StringIO()
175 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
176 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
177 handler = DummyHandler())
178 try:
179 policy.download_and_execute([])
180 assert False
181 except model.SafeException, ex:
182 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
183 raise ex
184 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
185 finally:
186 sys.stdout = old_out
188 def testAutopackage(self):
189 old_out = sys.stdout
190 try:
191 sys.stdout = StringIO()
192 self.child = server.handle_requests('HelloWorld.autopackage')
193 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
194 try:
195 policy.download_and_execute([])
196 assert False
197 except model.SafeException, ex:
198 if "HelloWorld/Missing" not in str(ex):
199 raise
200 finally:
201 sys.stdout = old_out
203 def testRecipeFailure(self):
204 old_out = sys.stdout
205 try:
206 sys.stdout = StringIO()
207 self.child = server.handle_requests('*')
208 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
209 handler = DummyHandler())
210 try:
211 policy.download_and_execute([])
212 assert False
213 except download.DownloadError, ex:
214 if "Connection" not in str(ex):
215 raise
216 finally:
217 sys.stdout = old_out
219 def testMirrors(self):
220 old_out = sys.stdout
221 try:
222 sys.stdout = StringIO()
223 getLogger().setLevel(ERROR)
224 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
225 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '6FCF121BE2390E0B.gpg')
226 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
227 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
229 refreshed = policy.solve_with_downloads([])
230 errors = policy.handler.wait_for_blocker(refreshed)
231 if errors:
232 raise model.SafeException("Errors during download: " + '\n'.join(errors))
233 assert policy.ready
234 finally:
235 sys.stdout = old_out
238 suite = unittest.makeSuite(TestDownload)
239 if __name__ == '__main__':
240 sys.argv.append('-v')
241 unittest.main()