If http_proxy is set it can cause the download tests to fail
[zeroinstall/zeroinstall-mseaborn.git] / tests / testdownload.py
blobda995b37b0070972f196fe68ce3a79aa84f8570d
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, DEBUG, INFO, WARN, ERROR
8 sys.path.insert(0, '..')
10 # If http_proxy is set it can cause the download tests to fail.
11 os.environ["http_proxy"] = ""
13 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
14 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
15 from zeroinstall.support import basedir, tasks
16 import data
17 import my_dbus
19 import server
21 ran_gui = False
22 def raise_gui(*args):
23 global ran_gui
24 ran_gui = True
25 background._detach = lambda: False
26 background._exec_gui = raise_gui
27 sys.modules['dbus'] = my_dbus
28 sys.modules['dbus.glib'] = my_dbus
29 my_dbus.types = my_dbus
30 sys.modules['dbus.types'] = my_dbus
32 class Reply:
33 def __init__(self, reply):
34 self.reply = reply
36 def readline(self):
37 return self.reply
39 class DummyHandler(handler.Handler):
40 __slots__ = ['ex', 'tb']
42 def __init__(self):
43 handler.Handler.__init__(self)
44 self.ex = None
46 def wait_for_blocker(self, blocker):
47 self.ex = None
48 handler.Handler.wait_for_blocker(self, blocker)
49 if self.ex:
50 raise self.ex, None, self.tb
52 def report_error(self, ex, tb = None):
53 assert self.ex is None, self.ex
54 self.ex = ex
55 self.tb = tb
57 #import traceback
58 #traceback.print_exc()
60 class TestDownload(BaseTest):
61 def setUp(self):
62 BaseTest.setUp(self)
64 stream = tempfile.TemporaryFile()
65 stream.write(data.thomas_key)
66 stream.seek(0)
67 gpg.import_key(stream)
68 self.child = None
70 trust.trust_db.watchers = []
72 def tearDown(self):
73 BaseTest.tearDown(self)
74 if self.child is not None:
75 os.kill(self.child, signal.SIGTERM)
76 os.waitpid(self.child, 0)
77 self.child = None
79 def testRejectKey(self):
80 old_out = sys.stdout
81 try:
82 sys.stdout = StringIO()
83 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
84 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
85 handler = DummyHandler())
86 assert policy.need_download()
87 sys.stdin = Reply("N\n")
88 try:
89 policy.download_and_execute(['Hello'])
90 assert 0
91 except model.SafeException, ex:
92 if "Not signed with a trusted key" not in str(ex):
93 raise ex
94 finally:
95 sys.stdout = old_out
97 def testRejectKeyXML(self):
98 old_out = sys.stdout
99 try:
100 sys.stdout = StringIO()
101 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
102 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
103 handler = DummyHandler())
104 assert policy.need_download()
105 sys.stdin = Reply("N\n")
106 try:
107 policy.download_and_execute(['Hello'])
108 assert 0
109 except model.SafeException, ex:
110 if "Not signed with a trusted key" not in str(ex):
111 raise
112 finally:
113 sys.stdout = old_out
115 def testImport(self):
116 old_out = sys.stdout
117 try:
118 from zeroinstall.injector import cli
119 import logging
121 rootLogger = getLogger()
122 rootLogger.disabled = True
123 try:
124 try:
125 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
126 assert 0
127 except model.SafeException, ex:
128 assert 'NO-SUCH-FILE' in str(ex)
129 finally:
130 rootLogger.disabled = False
131 rootLogger.setLevel(WARN)
133 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
134 self.assertEquals(0, len(hello.implementations))
136 sys.stdout = StringIO()
137 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
138 sys.stdin = Reply("Y\n")
140 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
141 cli.main(['--import', 'Hello'])
142 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
144 # Check we imported the interface after trusting the key
145 reader.update_from_cache(hello)
146 self.assertEquals(1, len(hello.implementations))
148 # Shouldn't need to prompt the second time
149 sys.stdin = None
150 cli.main(['--import', 'Hello'])
151 finally:
152 sys.stdout = old_out
154 def testSelections(self):
155 from zeroinstall.injector.cli import _download_missing_selections
156 root = qdom.parse(file("selections.xml"))
157 sels = selections.Selections(root)
158 class Options: dry_run = False
160 old_out = sys.stdout
161 try:
162 sys.stdout = StringIO()
163 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
164 sys.stdin = Reply("Y\n")
165 _download_missing_selections(Options(), sels)
166 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://localhost:8000/Hello.xml'].id)
167 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
169 assert sels.download_missing(iface_cache.iface_cache, None) is None
170 finally:
171 sys.stdout = old_out
173 def testAcceptKey(self):
174 old_out = sys.stdout
175 try:
176 sys.stdout = StringIO()
177 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
178 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
179 handler = DummyHandler())
180 assert policy.need_download()
181 sys.stdin = Reply("Y\n")
182 try:
183 policy.download_and_execute(['Hello'], main = 'Missing')
184 assert 0
185 except model.SafeException, ex:
186 if "HelloWorld/Missing" not in str(ex):
187 raise ex
188 finally:
189 sys.stdout = old_out
191 def testRecipe(self):
192 old_out = sys.stdout
193 try:
194 sys.stdout = StringIO()
195 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
196 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
197 try:
198 policy.download_and_execute([])
199 assert False
200 except model.SafeException, ex:
201 if "HelloWorld/Missing" not in str(ex):
202 raise ex
203 finally:
204 sys.stdout = old_out
206 def testSymlink(self):
207 old_out = sys.stdout
208 try:
209 sys.stdout = StringIO()
210 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
211 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
212 handler = DummyHandler())
213 try:
214 policy.download_and_execute([])
215 assert False
216 except model.SafeException, ex:
217 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
218 raise ex
219 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
220 finally:
221 sys.stdout = old_out
223 def testAutopackage(self):
224 old_out = sys.stdout
225 try:
226 sys.stdout = StringIO()
227 self.child = server.handle_requests('HelloWorld.autopackage')
228 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
229 try:
230 policy.download_and_execute([])
231 assert False
232 except model.SafeException, ex:
233 if "HelloWorld/Missing" not in str(ex):
234 raise
235 finally:
236 sys.stdout = old_out
238 def testRecipeFailure(self):
239 old_out = sys.stdout
240 try:
241 sys.stdout = StringIO()
242 self.child = server.handle_requests('*')
243 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
244 handler = DummyHandler())
245 try:
246 policy.download_and_execute([])
247 assert False
248 except download.DownloadError, ex:
249 if "Connection" not in str(ex):
250 raise
251 finally:
252 sys.stdout = old_out
254 def testMirrors(self):
255 old_out = sys.stdout
256 try:
257 sys.stdout = StringIO()
258 getLogger().setLevel(ERROR)
259 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
260 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
261 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
262 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
264 refreshed = policy.solve_with_downloads()
265 policy.handler.wait_for_blocker(refreshed)
266 assert policy.ready
267 finally:
268 sys.stdout = old_out
270 def testReplay(self):
271 old_out = sys.stdout
272 try:
273 sys.stdout = StringIO()
274 getLogger().setLevel(ERROR)
275 iface = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello.xml')
276 mtime = int(os.stat('Hello-new.xml').st_mtime)
277 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
279 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
280 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
281 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
282 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
284 # Update from mirror (should ignore out-of-date timestamp)
285 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
286 policy.handler.wait_for_blocker(refreshed)
288 # Update from upstream (should report an error)
289 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
290 try:
291 policy.handler.wait_for_blocker(refreshed)
292 raise Exception("Should have been rejected!")
293 except model.SafeException, ex:
294 assert "New interface's modification time is before old version" in str(ex)
296 # Must finish with the newest version
297 self.assertEquals(1209206132, iface_cache.iface_cache._get_signature_date(iface.uri))
298 finally:
299 sys.stdout = old_out
301 def testBackground(self, verbose = False):
302 p = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml')
303 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
304 p.freshness = 0
305 p.network_use = model.network_minimal
306 p.solver.solve(p.root, arch.get_host_architecture())
307 assert p.ready
309 @tasks.async
310 def choose_download(registed_cb, nid, actions):
311 try:
312 assert actions == ['download', 'Download'], actions
313 registed_cb(nid, 'download')
314 except:
315 import traceback
316 traceback.print_exc()
317 yield None
319 global ran_gui
320 ran_gui = False
321 old_out = sys.stdout
322 try:
323 sys.stdout = StringIO()
324 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
325 my_dbus.user_callback = choose_download
326 pid = os.getpid()
327 old_exit = os._exit
328 def my_exit(code):
329 # The background handler runs in the same process
330 # as the tests, so don't let it abort.
331 if os.getpid() == pid:
332 raise SystemExit(code)
333 # But, child download processes are OK
334 old_exit(code)
335 try:
336 try:
337 os._exit = my_exit
338 background.spawn_background_update(p, verbose)
339 assert False
340 except SystemExit, ex:
341 self.assertEquals(1, ex.code)
342 finally:
343 os._exit = old_exit
344 finally:
345 sys.stdout = old_out
346 assert ran_gui
348 def testBackgroundVerbose(self):
349 self.testBackground(verbose = True)
351 suite = unittest.makeSuite(TestDownload)
352 if __name__ == '__main__':
353 sys.argv.append('-v')
354 unittest.main()