Fixed GUI to display recursive runners
[zeroinstall.git] / tests / testdownload.py
blob88ab0b453336428af17822c1d9f8cd2dfdb263fc
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
15 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
16 from zeroinstall.support import basedir, tasks
17 from zeroinstall.injector import fetch
18 import data
19 import my_dbus
21 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
23 import server
25 ran_gui = False
26 def raise_gui(*args):
27 global ran_gui
28 ran_gui = True
29 background._detach = lambda: False
30 background._exec_gui = raise_gui
32 @contextmanager
33 def output_suppressed():
34 old_stdout = sys.stdout
35 old_stderr = sys.stderr
36 try:
37 sys.stdout = StringIO()
38 sys.stderr = StringIO()
39 try:
40 yield
41 except Exception:
42 raise
43 except BaseException, ex:
44 # Don't abort unit-tests if someone raises SystemExit
45 raise Exception(str(type(ex)) + " " + str(ex))
46 finally:
47 sys.stdout = old_stdout
48 sys.stderr = old_stderr
50 class Reply:
51 def __init__(self, reply):
52 self.reply = reply
54 def readline(self):
55 return self.reply
57 class DummyHandler(handler.Handler):
58 __slots__ = ['ex', 'tb']
60 def __init__(self):
61 handler.Handler.__init__(self)
62 self.ex = None
64 def wait_for_blocker(self, blocker):
65 self.ex = None
66 handler.Handler.wait_for_blocker(self, blocker)
67 if self.ex:
68 raise self.ex, None, self.tb
70 def report_error(self, ex, tb = None):
71 assert self.ex is None, self.ex
72 self.ex = ex
73 self.tb = tb
75 #import traceback
76 #traceback.print_exc()
78 class NetworkManager:
79 def state(self):
80 return 3 # NM_STATUS_CONNECTED
82 class TestDownload(BaseTest):
83 def setUp(self):
84 BaseTest.setUp(self)
86 stream = tempfile.TemporaryFile()
87 stream.write(data.thomas_key)
88 stream.seek(0)
89 gpg.import_key(stream)
90 self.child = None
92 trust.trust_db.watchers = []
94 def tearDown(self):
95 BaseTest.tearDown(self)
96 if self.child is not None:
97 os.kill(self.child, signal.SIGTERM)
98 os.waitpid(self.child, 0)
99 self.child = None
101 def testRejectKey(self):
102 with output_suppressed():
103 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
104 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
105 handler = DummyHandler())
106 assert policy.need_download()
107 sys.stdin = Reply("N\n")
108 try:
109 policy.download_and_execute(['Hello'])
110 assert 0
111 except model.SafeException, ex:
112 if "has no usable implementations" not in str(ex):
113 raise ex
114 if "Not signed with a trusted key" not in str(policy.handler.ex):
115 raise ex
117 def testRejectKeyXML(self):
118 with output_suppressed():
119 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
120 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
121 handler = DummyHandler())
122 assert policy.need_download()
123 sys.stdin = Reply("N\n")
124 try:
125 policy.download_and_execute(['Hello'])
126 assert 0
127 except model.SafeException, ex:
128 if "has no usable implementations" not in str(ex):
129 raise ex
130 if "Not signed with a trusted key" not in str(policy.handler.ex):
131 raise
133 def testImport(self):
134 from zeroinstall.injector import cli
136 rootLogger = getLogger()
137 rootLogger.disabled = True
138 try:
139 try:
140 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
141 assert 0
142 except model.SafeException, ex:
143 assert 'NO-SUCH-FILE' in str(ex)
144 finally:
145 rootLogger.disabled = False
146 rootLogger.setLevel(WARN)
148 hello = iface_cache.iface_cache.get_feed('http://localhost:8000/Hello')
149 self.assertEquals(None, hello)
151 with output_suppressed():
152 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
153 sys.stdin = Reply("Y\n")
155 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 cli.main(['--import', 'Hello'])
157 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
159 # Check we imported the interface after trusting the key
160 hello = iface_cache.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
161 self.assertEquals(1, len(hello.implementations))
163 # Shouldn't need to prompt the second time
164 sys.stdin = None
165 cli.main(['--import', 'Hello'])
167 def testSelections(self):
168 from zeroinstall.injector.cli import _download_missing_selections
169 root = qdom.parse(file("selections.xml"))
170 sels = selections.Selections(root)
171 class Options: dry_run = False
173 with output_suppressed():
174 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
175 sys.stdin = Reply("Y\n")
176 _download_missing_selections(Options(), sels)
177 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
178 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
180 assert sels.download_missing(iface_cache.iface_cache, None) is None
182 def testHelpers(self):
183 from zeroinstall import helpers
185 with output_suppressed():
186 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
187 sys.stdin = Reply("Y\n")
188 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml')
189 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
190 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
191 assert sels.download_missing(iface_cache.iface_cache, None) is None
193 def testSelectionsWithFeed(self):
194 from zeroinstall.injector.cli import _download_missing_selections
195 root = qdom.parse(file("selections.xml"))
196 sels = selections.Selections(root)
197 class Options: dry_run = False
199 with output_suppressed():
200 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
201 sys.stdin = Reply("Y\n")
203 from zeroinstall.injector.handler import Handler
204 handler = Handler()
205 fetcher = fetch.Fetcher(handler)
206 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
208 _download_missing_selections(Options(), sels)
209 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
210 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
212 assert sels.download_missing(iface_cache.iface_cache, None) is None
214 def testAcceptKey(self):
215 with output_suppressed():
216 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
217 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
218 handler = DummyHandler())
219 assert policy.need_download()
220 sys.stdin = Reply("Y\n")
221 try:
222 policy.download_and_execute(['Hello'], main = 'Missing')
223 assert 0
224 except model.SafeException, ex:
225 if "HelloWorld/Missing" not in str(ex):
226 raise ex
228 def testDistro(self):
229 with output_suppressed():
230 native_url = 'http://example.com:8000/Native.xml'
232 # Initially, we don't have the feed at all...
233 master_feed = iface_cache.iface_cache.get_feed(native_url)
234 assert master_feed is None, master_feed
236 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
237 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
238 h = DummyHandler()
239 policy = autopolicy.AutoPolicy(native_url, download_only = False, handler = h)
240 assert policy.need_download()
242 solve = policy.solve_with_downloads()
243 h.wait_for_blocker(solve)
244 tasks.check(solve)
246 master_feed = iface_cache.iface_cache.get_feed(native_url)
247 assert master_feed is not None
248 assert master_feed.implementations == {}
250 distro_feed_url = master_feed.get_distro_feed()
251 assert distro_feed_url is not None
252 distro_feed = iface_cache.iface_cache.get_feed(distro_feed_url)
253 assert distro_feed is not None
254 assert len(distro_feed.implementations) == 2, distro_feed.implementations
256 def testWrongSize(self):
257 with output_suppressed():
258 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
259 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
260 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello-wrong-size', download_only = False,
261 handler = DummyHandler())
262 assert policy.need_download()
263 sys.stdin = Reply("Y\n")
264 try:
265 policy.download_and_execute(['Hello'], main = 'Missing')
266 assert 0
267 except model.SafeException, ex:
268 if "Downloaded archive has incorrect size" not in str(ex):
269 raise ex
271 def testRecipe(self):
272 old_out = sys.stdout
273 try:
274 sys.stdout = StringIO()
275 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
276 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
277 try:
278 policy.download_and_execute([])
279 assert False
280 except model.SafeException, ex:
281 if "HelloWorld/Missing" not in str(ex):
282 raise ex
283 finally:
284 sys.stdout = old_out
286 def testSymlink(self):
287 old_out = sys.stdout
288 try:
289 sys.stdout = StringIO()
290 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
291 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
292 handler = DummyHandler())
293 try:
294 policy.download_and_execute([])
295 assert False
296 except model.SafeException, ex:
297 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
298 raise
299 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
300 finally:
301 sys.stdout = old_out
303 def testAutopackage(self):
304 old_out = sys.stdout
305 try:
306 sys.stdout = StringIO()
307 self.child = server.handle_requests('HelloWorld.autopackage')
308 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
309 try:
310 policy.download_and_execute([])
311 assert False
312 except model.SafeException, ex:
313 if "HelloWorld/Missing" not in str(ex):
314 raise
315 finally:
316 sys.stdout = old_out
318 def testRecipeFailure(self):
319 old_out = sys.stdout
320 try:
321 sys.stdout = StringIO()
322 self.child = server.handle_requests('*')
323 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
324 handler = DummyHandler())
325 try:
326 policy.download_and_execute([])
327 assert False
328 except download.DownloadError, ex:
329 if "Connection" not in str(ex):
330 raise
331 finally:
332 sys.stdout = old_out
334 def testMirrors(self):
335 old_out = sys.stdout
336 try:
337 sys.stdout = StringIO()
338 getLogger().setLevel(ERROR)
339 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
340 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
341 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
342 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
344 refreshed = policy.solve_with_downloads()
345 policy.handler.wait_for_blocker(refreshed)
346 assert policy.ready
347 finally:
348 sys.stdout = old_out
350 def testReplay(self):
351 old_out = sys.stdout
352 try:
353 sys.stdout = StringIO()
354 getLogger().setLevel(ERROR)
355 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
356 mtime = int(os.stat('Hello-new.xml').st_mtime)
357 iface_cache.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
359 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
360 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
361 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
362 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
364 # Update from mirror (should ignore out-of-date timestamp)
365 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
366 policy.handler.wait_for_blocker(refreshed)
368 # Update from upstream (should report an error)
369 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
370 try:
371 policy.handler.wait_for_blocker(refreshed)
372 raise Exception("Should have been rejected!")
373 except model.SafeException, ex:
374 assert "New feed's modification time is before old version" in str(ex)
376 # Must finish with the newest version
377 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
378 finally:
379 sys.stdout = old_out
381 def testBackground(self, verbose = False):
382 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
383 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
384 p.freshness = 0
385 p.network_use = model.network_minimal
386 p.solver.solve(p.root, arch.get_host_architecture())
387 assert p.ready
389 @tasks.async
390 def choose_download(registed_cb, nid, actions):
391 try:
392 assert actions == ['download', 'Download'], actions
393 registed_cb(nid, 'download')
394 except:
395 import traceback
396 traceback.print_exc()
397 yield None
399 global ran_gui
400 ran_gui = False
401 old_out = sys.stdout
402 try:
403 sys.stdout = StringIO()
404 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
405 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
406 my_dbus.user_callback = choose_download
407 pid = os.getpid()
408 old_exit = os._exit
409 def my_exit(code):
410 # The background handler runs in the same process
411 # as the tests, so don't let it abort.
412 if os.getpid() == pid:
413 raise SystemExit(code)
414 # But, child download processes are OK
415 old_exit(code)
416 key_info = fetch.DEFAULT_KEY_LOOKUP_SERVER
417 fetch.DEFAULT_KEY_LOOKUP_SERVER = None
418 try:
419 try:
420 os._exit = my_exit
421 background.spawn_background_update(p, verbose)
422 assert False
423 except SystemExit, ex:
424 self.assertEquals(1, ex.code)
425 finally:
426 os._exit = old_exit
427 fetch.DEFAULT_KEY_LOOKUP_SERVER = key_info
428 finally:
429 sys.stdout = old_out
430 assert ran_gui
432 def testBackgroundVerbose(self):
433 self.testBackground(verbose = True)
435 if __name__ == '__main__':
436 unittest.main()