Refactored code as required for the new 0publish
[zeroinstall/zeroinstall-limyreth.git] / tests / testdownload.py
blob6480eb8116f057150fb17ee7ba43d5dae794d850
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 import server
24 ran_gui = False
25 def raise_gui(*args):
26 global ran_gui
27 ran_gui = True
28 background._detach = lambda: False
29 background._exec_gui = raise_gui
31 @contextmanager
32 def output_suppressed():
33 old_stdout = sys.stdout
34 old_stderr = sys.stderr
35 try:
36 sys.stdout = StringIO()
37 sys.stderr = StringIO()
38 try:
39 yield
40 except Exception:
41 raise
42 except BaseException as ex:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex)) + " " + str(ex))
45 finally:
46 sys.stdout = old_stdout
47 sys.stderr = old_stderr
49 class Reply:
50 def __init__(self, reply):
51 self.reply = reply
53 def readline(self):
54 return self.reply
56 def download_and_execute(policy, prog_args, main = None):
57 downloaded = policy.solve_and_download_impls()
58 if downloaded:
59 policy.config.handler.wait_for_blocker(downloaded)
60 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
62 class NetworkManager:
63 def state(self):
64 return 3 # NM_STATUS_CONNECTED
66 class TestDownload(BaseTest):
67 def setUp(self):
68 BaseTest.setUp(self)
70 self.config.handler.allow_downloads = True
71 self.config.key_info_server = 'http://localhost:3333/key-info'
73 self.config.fetcher = fetch.Fetcher(self.config)
75 stream = tempfile.TemporaryFile()
76 stream.write(data.thomas_key)
77 stream.seek(0)
78 gpg.import_key(stream)
79 self.child = None
81 trust.trust_db.watchers = []
83 def tearDown(self):
84 BaseTest.tearDown(self)
85 if self.child is not None:
86 os.kill(self.child, signal.SIGTERM)
87 os.waitpid(self.child, 0)
88 self.child = None
90 def testRejectKey(self):
91 with output_suppressed():
92 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
93 policy = Policy('http://localhost:8000/Hello', config = self.config)
94 assert policy.need_download()
95 sys.stdin = Reply("N\n")
96 try:
97 download_and_execute(policy, ['Hello'])
98 assert 0
99 except model.SafeException as ex:
100 if "has no usable implementations" not in str(ex):
101 raise ex
102 if "Not signed with a trusted key" not in str(policy.handler.ex):
103 raise ex
104 self.config.handler.ex = None
106 def testRejectKeyXML(self):
107 with output_suppressed():
108 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
109 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
110 assert policy.need_download()
111 sys.stdin = Reply("N\n")
112 try:
113 download_and_execute(policy, ['Hello'])
114 assert 0
115 except model.SafeException as ex:
116 if "has no usable implementations" not in str(ex):
117 raise ex
118 if "Not signed with a trusted key" not in str(policy.handler.ex):
119 raise
120 self.config.handler.ex = None
122 def testImport(self):
123 from zeroinstall.injector import cli
125 rootLogger = getLogger()
126 rootLogger.disabled = True
127 try:
128 try:
129 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
130 assert 0
131 except model.SafeException as ex:
132 assert 'NO-SUCH-FILE' in str(ex)
133 finally:
134 rootLogger.disabled = False
135 rootLogger.setLevel(WARN)
137 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
138 self.assertEquals(None, hello)
140 with output_suppressed():
141 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
142 sys.stdin = Reply("Y\n")
144 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
145 cli.main(['--import', 'Hello'], config = self.config)
146 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
148 # Check we imported the interface after trusting the key
149 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
150 self.assertEquals(1, len(hello.implementations))
152 # Shouldn't need to prompt the second time
153 sys.stdin = None
154 cli.main(['--import', 'Hello'], config = self.config)
156 def testSelections(self):
157 from zeroinstall.injector import cli
158 root = qdom.parse(file("selections.xml"))
159 sels = selections.Selections(root)
160 class Options: dry_run = False
162 with output_suppressed():
163 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
164 sys.stdin = Reply("Y\n")
165 try:
166 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
167 assert False
168 except NotStored:
169 pass
170 cli.main(['--download-only', 'selections.xml'], config = self.config)
171 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
172 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
174 assert sels.download_missing(self.config) is None
176 def testHelpers(self):
177 from zeroinstall import helpers
179 with output_suppressed():
180 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
181 sys.stdin = Reply("Y\n")
182 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
183 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
184 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
185 assert sels.download_missing(self.config) is None
187 def testSelectionsWithFeed(self):
188 from zeroinstall.injector import cli
189 root = qdom.parse(file("selections.xml"))
190 sels = selections.Selections(root)
192 with output_suppressed():
193 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
194 sys.stdin = Reply("Y\n")
196 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
198 cli.main(['--download-only', 'selections.xml'], config = self.config)
199 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
200 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
202 assert sels.download_missing(self.config) is None
204 def testAcceptKey(self):
205 with output_suppressed():
206 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
207 policy = Policy('http://localhost:8000/Hello', config = self.config)
208 assert policy.need_download()
209 sys.stdin = Reply("Y\n")
210 try:
211 download_and_execute(policy, ['Hello'], main = 'Missing')
212 assert 0
213 except model.SafeException as ex:
214 if "HelloWorld/Missing" not in str(ex):
215 raise
217 def testAutoAcceptKey(self):
218 self.config.auto_approve_keys = True
219 with output_suppressed():
220 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
221 policy = Policy('http://localhost:8000/Hello', config = self.config)
222 assert policy.need_download()
223 sys.stdin = Reply("")
224 try:
225 download_and_execute(policy, ['Hello'], main = 'Missing')
226 assert 0
227 except model.SafeException as ex:
228 if "HelloWorld/Missing" not in str(ex):
229 raise
231 def testDistro(self):
232 with output_suppressed():
233 native_url = 'http://example.com:8000/Native.xml'
235 # Initially, we don't have the feed at all...
236 master_feed = self.config.iface_cache.get_feed(native_url)
237 assert master_feed is None, master_feed
239 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
240 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
241 policy = Policy(native_url, config = self.config)
242 assert policy.need_download()
244 solve = policy.solve_with_downloads()
245 self.config.handler.wait_for_blocker(solve)
246 tasks.check(solve)
248 master_feed = self.config.iface_cache.get_feed(native_url)
249 assert master_feed is not None
250 assert master_feed.implementations == {}
252 distro_feed_url = master_feed.get_distro_feed()
253 assert distro_feed_url is not None
254 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
255 assert distro_feed is not None
256 assert len(distro_feed.implementations) == 2, distro_feed.implementations
258 def testWrongSize(self):
259 with output_suppressed():
260 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
261 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
262 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
263 assert policy.need_download()
264 sys.stdin = Reply("Y\n")
265 try:
266 download_and_execute(policy, ['Hello'], main = 'Missing')
267 assert 0
268 except model.SafeException as ex:
269 if "Downloaded archive has incorrect size" not in str(ex):
270 raise ex
272 def testImplementationGenerateMissingId(self):
273 old_out = sys.stdout
274 try:
275 sys.stdout = StringIO()
276 self.child = server.handle_requests(('HelloWorld.tgz'))
278 from zeroinstall.zerostore import manifest
279 alg = manifest.get_algorithm('sha1')
280 assert alg
282 from zeroinstall.injector.reader import load_feed
283 feed = load_feed(os.path.abspath('ImplementationNoId.xml'), True, False, False, alg, self.config)
285 expected_id = 'sha1=3ce644dc725f1d21cfcf02562c76f375944b266a'
286 assert feed.implementations[expected_id]
287 assert feed.implementations[expected_id].id == expected_id
288 finally:
289 sys.stdout = old_out
291 def testArchiveGenerateMissingSize(self):
292 old_out = sys.stdout
293 try:
294 sys.stdout = StringIO()
295 self.child = server.handle_requests(('HelloWorld.tgz'))
297 from zeroinstall.injector.reader import load_feed
298 feed = load_feed(os.path.abspath('MissingSize.xml'), True, False, True, None, self.config)
300 expected_id = 'sha1=3ce644dc725f1d21cfcf02562c76f375944b266a'
301 assert feed.implementations[expected_id].download_sources[0].size == 176
302 finally:
303 sys.stdout = old_out
305 def testRecipe(self):
306 old_out = sys.stdout
307 try:
308 sys.stdout = StringIO()
309 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
310 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
311 try:
312 download_and_execute(policy, [])
313 assert False
314 except model.SafeException as ex:
315 if "HelloWorld/Missing" not in str(ex):
316 raise ex
317 finally:
318 sys.stdout = old_out
320 def testRecipeUnpack(self):
321 old_out = sys.stdout
322 try:
323 sys.stdout = StringIO()
324 self.child = server.handle_requests(('doubly_packed.tar'))
325 policy = Policy(os.path.abspath('Unpack.xml'), config = self.config)
326 try:
327 download_and_execute(policy, [])
328 assert False
329 except model.SafeException, ex:
330 if "HelloWorld/Missing" not in str(ex):
331 raise ex
332 finally:
333 sys.stdout = old_out
335 def testSymlink(self):
336 old_out = sys.stdout
337 try:
338 sys.stdout = StringIO()
339 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
340 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
341 try:
342 download_and_execute(policy, [])
343 assert False
344 except model.SafeException as ex:
345 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
346 raise
347 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
348 finally:
349 sys.stdout = old_out
351 def testAutopackage(self):
352 old_out = sys.stdout
353 try:
354 sys.stdout = StringIO()
355 self.child = server.handle_requests('HelloWorld.autopackage')
356 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
357 try:
358 download_and_execute(policy, [])
359 assert False
360 except model.SafeException as ex:
361 if "HelloWorld/Missing" not in str(ex):
362 raise
363 finally:
364 sys.stdout = old_out
366 def testRecipeFailure(self):
367 old_out = sys.stdout
368 try:
369 sys.stdout = StringIO()
370 self.child = server.handle_requests('*')
371 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
372 try:
373 download_and_execute(policy, [])
374 assert False
375 except download.DownloadError as ex:
376 if "Connection" not in str(ex):
377 raise
378 finally:
379 sys.stdout = old_out
381 def testMirrors(self):
382 old_out = sys.stdout
383 try:
384 sys.stdout = StringIO()
385 getLogger().setLevel(ERROR)
386 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
387 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
388 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
389 self.config.feed_mirror = 'http://example.com:8000/0mirror'
391 refreshed = policy.solve_with_downloads()
392 policy.handler.wait_for_blocker(refreshed)
393 assert policy.ready
394 finally:
395 sys.stdout = old_out
397 def testReplay(self):
398 old_out = sys.stdout
399 try:
400 sys.stdout = StringIO()
401 getLogger().setLevel(ERROR)
402 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
403 mtime = int(os.stat('Hello-new.xml').st_mtime)
404 self.config.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
406 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
407 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
408 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
409 self.config.feed_mirror = 'http://example.com:8000/0mirror'
411 # Update from mirror (should ignore out-of-date timestamp)
412 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
413 policy.handler.wait_for_blocker(refreshed)
415 # Update from upstream (should report an error)
416 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
417 try:
418 policy.handler.wait_for_blocker(refreshed)
419 raise Exception("Should have been rejected!")
420 except model.SafeException as ex:
421 assert "New feed's modification time is before old version" in str(ex)
423 # Must finish with the newest version
424 self.assertEquals(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
425 finally:
426 sys.stdout = old_out
428 def testBackground(self, verbose = False):
429 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
430 self.import_feed(p.root, 'Hello.xml')
431 p.freshness = 0
432 p.network_use = model.network_minimal
433 p.solver.solve(p.root, arch.get_host_architecture())
434 assert p.ready, p.solver.get_failure_reason()
436 @tasks.async
437 def choose_download(registed_cb, nid, actions):
438 try:
439 assert actions == ['download', 'Download'], actions
440 registed_cb(nid, 'download')
441 except:
442 import traceback
443 traceback.print_exc()
444 yield None
446 global ran_gui
447 ran_gui = False
448 old_out = sys.stdout
449 try:
450 sys.stdout = StringIO()
451 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
452 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
453 my_dbus.user_callback = choose_download
454 pid = os.getpid()
455 old_exit = os._exit
456 def my_exit(code):
457 # The background handler runs in the same process
458 # as the tests, so don't let it abort.
459 if os.getpid() == pid:
460 raise SystemExit(code)
461 # But, child download processes are OK
462 old_exit(code)
463 from zeroinstall.injector import config
464 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
465 config.DEFAULT_KEY_LOOKUP_SERVER = None
466 try:
467 try:
468 os._exit = my_exit
469 background.spawn_background_update(p, verbose)
470 assert False
471 except SystemExit as ex:
472 self.assertEquals(1, ex.code)
473 finally:
474 os._exit = old_exit
475 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
476 finally:
477 sys.stdout = old_out
478 assert ran_gui
480 def testBackgroundVerbose(self):
481 self.testBackground(verbose = True)
483 if __name__ == '__main__':
484 unittest.main()