Fix some bugs found by pychecker
[zeroinstall.git] / tests / testdownload.py
blobf1051b34267498f97ea5c8eb5cf2c1a34a86faba
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
15 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
16 from zeroinstall.support import basedir, tasks
17 from zeroinstall.injector import fetch
18 import data
19 import my_dbus
21 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
23 import server
25 ran_gui = False
26 def raise_gui(*args):
27 global ran_gui
28 ran_gui = True
29 background._detach = lambda: False
30 background._exec_gui = raise_gui
32 @contextmanager
33 def output_suppressed():
34 old_stdout = sys.stdout
35 old_stderr = sys.stderr
36 try:
37 sys.stdout = StringIO()
38 sys.stderr = StringIO()
39 yield
40 finally:
41 sys.stdout = old_stdout
42 sys.stderr = old_stderr
44 class Reply:
45 def __init__(self, reply):
46 self.reply = reply
48 def readline(self):
49 return self.reply
51 class DummyHandler(handler.Handler):
52 __slots__ = ['ex', 'tb']
54 def __init__(self):
55 handler.Handler.__init__(self)
56 self.ex = None
58 def wait_for_blocker(self, blocker):
59 self.ex = None
60 handler.Handler.wait_for_blocker(self, blocker)
61 if self.ex:
62 raise self.ex, None, self.tb
64 def report_error(self, ex, tb = None):
65 assert self.ex is None, self.ex
66 self.ex = ex
67 self.tb = tb
69 #import traceback
70 #traceback.print_exc()
72 class TestDownload(BaseTest):
73 def setUp(self):
74 BaseTest.setUp(self)
76 stream = tempfile.TemporaryFile()
77 stream.write(data.thomas_key)
78 stream.seek(0)
79 gpg.import_key(stream)
80 self.child = None
82 trust.trust_db.watchers = []
84 def tearDown(self):
85 BaseTest.tearDown(self)
86 if self.child is not None:
87 os.kill(self.child, signal.SIGTERM)
88 os.waitpid(self.child, 0)
89 self.child = None
91 def testRejectKey(self):
92 with output_suppressed():
93 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
94 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
95 handler = DummyHandler())
96 assert policy.need_download()
97 sys.stdin = Reply("N\n")
98 try:
99 policy.download_and_execute(['Hello'])
100 assert 0
101 except model.SafeException, ex:
102 if "Can't find all required implementations" not in str(ex):
103 raise ex
104 if "Not signed with a trusted key" not in str(policy.handler.ex):
105 raise ex
107 def testRejectKeyXML(self):
108 with output_suppressed():
109 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
110 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
111 handler = DummyHandler())
112 assert policy.need_download()
113 sys.stdin = Reply("N\n")
114 try:
115 policy.download_and_execute(['Hello'])
116 assert 0
117 except model.SafeException, ex:
118 if "Can't find all required implementations" not in str(ex):
119 raise ex
120 if "Not signed with a trusted key" not in str(policy.handler.ex):
121 raise
123 def testImport(self):
124 from zeroinstall.injector import cli
126 rootLogger = getLogger()
127 rootLogger.disabled = True
128 try:
129 try:
130 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
131 assert 0
132 except model.SafeException, ex:
133 assert 'NO-SUCH-FILE' in str(ex)
134 finally:
135 rootLogger.disabled = False
136 rootLogger.setLevel(WARN)
138 hello = iface_cache.iface_cache.get_feed('http://localhost:8000/Hello')
139 self.assertEquals(None, hello)
141 with output_suppressed():
142 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
143 sys.stdin = Reply("Y\n")
145 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
146 cli.main(['--import', 'Hello'])
147 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
149 # Check we imported the interface after trusting the key
150 hello = iface_cache.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
151 self.assertEquals(1, len(hello.implementations))
153 # Shouldn't need to prompt the second time
154 sys.stdin = None
155 cli.main(['--import', 'Hello'])
157 def testSelections(self):
158 from zeroinstall.injector.cli import _download_missing_selections
159 root = qdom.parse(file("selections.xml"))
160 sels = selections.Selections(root)
161 class Options: dry_run = False
163 with output_suppressed():
164 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
165 sys.stdin = Reply("Y\n")
166 _download_missing_selections(Options(), sels)
167 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
168 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
170 assert sels.download_missing(iface_cache.iface_cache, None) is None
172 def testSelectionsWithFeed(self):
173 from zeroinstall.injector.cli import _download_missing_selections
174 root = qdom.parse(file("selections.xml"))
175 sels = selections.Selections(root)
176 class Options: dry_run = False
178 with output_suppressed():
179 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
180 sys.stdin = Reply("Y\n")
182 from zeroinstall.injector.handler import Handler
183 handler = Handler()
184 fetcher = fetch.Fetcher(handler)
185 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
187 _download_missing_selections(Options(), sels)
188 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
189 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
191 assert sels.download_missing(iface_cache.iface_cache, None) is None
193 def testAcceptKey(self):
194 with output_suppressed():
195 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
196 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
197 handler = DummyHandler())
198 assert policy.need_download()
199 sys.stdin = Reply("Y\n")
200 try:
201 policy.download_and_execute(['Hello'], main = 'Missing')
202 assert 0
203 except model.SafeException, ex:
204 if "HelloWorld/Missing" not in str(ex):
205 raise ex
207 def testDistro(self):
208 with output_suppressed():
209 native_url = 'http://example.com:8000/Native.xml'
211 # Initially, we don't have the feed at all...
212 master_feed = iface_cache.iface_cache.get_feed(native_url)
213 assert master_feed is None, master_feed
215 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
216 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
217 h = DummyHandler()
218 policy = autopolicy.AutoPolicy(native_url, download_only = False, handler = h)
219 assert policy.need_download()
221 solve = policy.solve_with_downloads()
222 h.wait_for_blocker(solve)
223 tasks.check(solve)
225 master_feed = iface_cache.iface_cache.get_feed(native_url)
226 assert master_feed is not None
227 assert master_feed.implementations == {}
229 distro_feed_url = master_feed.get_distro_feed()
230 assert distro_feed_url is not None
231 distro_feed = iface_cache.iface_cache.get_feed(distro_feed_url)
232 assert distro_feed is not None
233 assert len(distro_feed.implementations) == 2, distro_feed.implementations
235 def testWrongSize(self):
236 with output_suppressed():
237 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
238 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
239 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello-wrong-size', download_only = False,
240 handler = DummyHandler())
241 assert policy.need_download()
242 sys.stdin = Reply("Y\n")
243 try:
244 policy.download_and_execute(['Hello'], main = 'Missing')
245 assert 0
246 except model.SafeException, ex:
247 if "Downloaded archive has incorrect size" not in str(ex):
248 raise ex
250 def testRecipe(self):
251 old_out = sys.stdout
252 try:
253 sys.stdout = StringIO()
254 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
255 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
256 try:
257 policy.download_and_execute([])
258 assert False
259 except model.SafeException, ex:
260 if "HelloWorld/Missing" not in str(ex):
261 raise ex
262 finally:
263 sys.stdout = old_out
265 def testSymlink(self):
266 old_out = sys.stdout
267 try:
268 sys.stdout = StringIO()
269 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
270 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
271 handler = DummyHandler())
272 try:
273 policy.download_and_execute([])
274 assert False
275 except model.SafeException, ex:
276 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
277 raise ex
278 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
279 finally:
280 sys.stdout = old_out
282 def testAutopackage(self):
283 old_out = sys.stdout
284 try:
285 sys.stdout = StringIO()
286 self.child = server.handle_requests('HelloWorld.autopackage')
287 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
288 try:
289 policy.download_and_execute([])
290 assert False
291 except model.SafeException, ex:
292 if "HelloWorld/Missing" not in str(ex):
293 raise
294 finally:
295 sys.stdout = old_out
297 def testRecipeFailure(self):
298 old_out = sys.stdout
299 try:
300 sys.stdout = StringIO()
301 self.child = server.handle_requests('*')
302 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
303 handler = DummyHandler())
304 try:
305 policy.download_and_execute([])
306 assert False
307 except download.DownloadError, ex:
308 if "Connection" not in str(ex):
309 raise
310 finally:
311 sys.stdout = old_out
313 def testMirrors(self):
314 old_out = sys.stdout
315 try:
316 sys.stdout = StringIO()
317 getLogger().setLevel(ERROR)
318 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
319 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
320 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
321 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
323 refreshed = policy.solve_with_downloads()
324 policy.handler.wait_for_blocker(refreshed)
325 assert policy.ready
326 finally:
327 sys.stdout = old_out
329 def testReplay(self):
330 old_out = sys.stdout
331 try:
332 sys.stdout = StringIO()
333 getLogger().setLevel(ERROR)
334 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
335 mtime = int(os.stat('Hello-new.xml').st_mtime)
336 iface_cache.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
338 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
339 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
340 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
341 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
343 # Update from mirror (should ignore out-of-date timestamp)
344 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
345 policy.handler.wait_for_blocker(refreshed)
347 # Update from upstream (should report an error)
348 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
349 try:
350 policy.handler.wait_for_blocker(refreshed)
351 raise Exception("Should have been rejected!")
352 except model.SafeException, ex:
353 assert "New feed's modification time is before old version" in str(ex)
355 # Must finish with the newest version
356 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
357 finally:
358 sys.stdout = old_out
360 def testBackground(self, verbose = False):
361 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
362 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
363 p.freshness = 0
364 p.network_use = model.network_minimal
365 p.solver.solve(p.root, arch.get_host_architecture())
366 assert p.ready
368 @tasks.async
369 def choose_download(registed_cb, nid, actions):
370 try:
371 assert actions == ['download', 'Download'], actions
372 registed_cb(nid, 'download')
373 except:
374 import traceback
375 traceback.print_exc()
376 yield None
378 global ran_gui
379 ran_gui = False
380 old_out = sys.stdout
381 try:
382 sys.stdout = StringIO()
383 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
384 my_dbus.user_callback = choose_download
385 pid = os.getpid()
386 old_exit = os._exit
387 def my_exit(code):
388 # The background handler runs in the same process
389 # as the tests, so don't let it abort.
390 if os.getpid() == pid:
391 raise SystemExit(code)
392 # But, child download processes are OK
393 old_exit(code)
394 key_info = fetch.DEFAULT_KEY_LOOKUP_SERVER
395 fetch.DEFAULT_KEY_LOOKUP_SERVER = None
396 try:
397 try:
398 os._exit = my_exit
399 background.spawn_background_update(p, verbose)
400 assert False
401 except SystemExit, ex:
402 self.assertEquals(1, ex.code)
403 finally:
404 os._exit = old_exit
405 fetch.DEFAULT_KEY_LOOKUP_SERVER = key_info
406 finally:
407 sys.stdout = old_out
408 assert ran_gui
410 def testBackgroundVerbose(self):
411 self.testBackground(verbose = True)
413 if __name__ == '__main__':
414 unittest.main()