Updated Policy tests to test Driver instead
[zeroinstall.git] / tests / testdownload.py
blobd77dd34deda1e88839c9d6588f9601b6cc6e1113
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall import helpers
15 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
16 from zeroinstall.injector.requirements import Requirements
17 from zeroinstall.injector.driver import Driver
18 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
19 from zeroinstall.support import basedir, tasks, ro_rmtree
20 from zeroinstall.injector import fetch
21 import data
22 import my_dbus
24 import server
26 ran_gui = False
27 def raise_gui(*args):
28 global ran_gui
29 ran_gui = True
30 background._detach = lambda: False
32 local_hello = """<?xml version="1.0" ?>
33 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
34 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
35 </selections>"""
37 @contextmanager
38 def output_suppressed():
39 old_stdout = sys.stdout
40 old_stderr = sys.stderr
41 try:
42 sys.stdout = StringIO()
43 sys.stderr = StringIO()
44 try:
45 yield
46 except Exception:
47 raise
48 except BaseException as ex:
49 # Don't abort unit-tests if someone raises SystemExit
50 raise Exception(str(type(ex)) + " " + str(ex))
51 finally:
52 sys.stdout = old_stdout
53 sys.stderr = old_stderr
55 @contextmanager
56 def trapped_exit(expected_exit_status):
57 pid = os.getpid()
58 old_exit = os._exit
59 def my_exit(code):
60 # The background handler runs in the same process
61 # as the tests, so don't let it abort.
62 if os.getpid() == pid:
63 raise SystemExit(code)
64 # But, child download processes are OK
65 old_exit(code)
66 os._exit = my_exit
67 try:
68 try:
69 yield
70 assert False
71 except SystemExit as ex:
72 assert ex.code == expected_exit_status
73 finally:
74 os._exit = old_exit
76 class Reply:
77 def __init__(self, reply):
78 self.reply = reply
80 def readline(self):
81 return self.reply
83 def download_and_execute(driver, prog_args, main = None):
84 downloaded = driver.solve_and_download_impls()
85 if downloaded:
86 tasks.wait_for_blocker(downloaded)
87 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
89 class NetworkManager:
90 def state(self):
91 return 3 # NM_STATUS_CONNECTED
93 server_process = None
94 def kill_server_process():
95 global server_process
96 if server_process is not None:
97 # The process may still be running. See
98 # http://bugs.python.org/issue14252 for why this is so
99 # complicated.
100 if os.name != 'nt':
101 server_process.kill()
102 else:
103 try:
104 server_process.kill()
105 except WindowsError, e:
106 # This is what happens when terminate
107 # is called after the process has died.
108 if e.winerror == 5 and e.strerror == 'Access is denied':
109 assert not server_process.poll()
110 else:
111 raise
112 server_process.wait()
113 server_process = None
115 def run_server(*args):
116 global server_process
117 assert server_process is None
118 server_process = server.handle_requests(*args)
120 real_get_selections_gui = helpers.get_selections_gui
122 class TestDownload(BaseTest):
123 def setUp(self):
124 BaseTest.setUp(self)
126 self.config.handler.allow_downloads = True
127 self.config.key_info_server = 'http://localhost:3333/key-info'
129 self.config.fetcher = fetch.Fetcher(self.config)
131 stream = tempfile.TemporaryFile()
132 stream.write(data.thomas_key)
133 stream.seek(0)
134 gpg.import_key(stream)
135 stream.close()
137 trust.trust_db.watchers = []
139 helpers.get_selections_gui = raise_gui
141 global ran_gui
142 ran_gui = False
144 def tearDown(self):
145 helpers.get_selections_gui = real_get_selections_gui
146 BaseTest.tearDown(self)
147 kill_server_process()
149 def testRejectKey(self):
150 with output_suppressed():
151 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
152 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
153 assert driver.need_download()
154 sys.stdin = Reply("N\n")
155 try:
156 download_and_execute(driver, ['Hello'])
157 assert 0
158 except model.SafeException as ex:
159 if "has no usable implementations" not in str(ex):
160 raise ex
161 if "Not signed with a trusted key" not in str(self.config.handler.ex):
162 raise self.config.handler.ex
163 self.config.handler.ex = None
165 def testRejectKeyXML(self):
166 with output_suppressed():
167 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
168 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
169 assert driver.need_download()
170 sys.stdin = Reply("N\n")
171 try:
172 download_and_execute(driver, ['Hello'])
173 assert 0
174 except model.SafeException as ex:
175 if "has no usable implementations" not in str(ex):
176 raise ex
177 if "Not signed with a trusted key" not in str(self.config.handler.ex):
178 raise
179 self.config.handler.ex = None
181 def testImport(self):
182 from zeroinstall.injector import cli
184 rootLogger = getLogger()
185 rootLogger.disabled = True
186 try:
187 try:
188 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
189 assert 0
190 except model.SafeException as ex:
191 assert 'NO-SUCH-FILE' in str(ex)
192 finally:
193 rootLogger.disabled = False
194 rootLogger.setLevel(WARN)
196 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
197 self.assertEqual(None, hello)
199 with output_suppressed():
200 run_server('6FCF121BE2390E0B.gpg')
201 sys.stdin = Reply("Y\n")
203 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
204 cli.main(['--import', 'Hello'], config = self.config)
205 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
207 # Check we imported the interface after trusting the key
208 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
209 self.assertEqual(1, len(hello.implementations))
211 self.assertEqual(None, hello.local_path)
213 # Shouldn't need to prompt the second time
214 sys.stdin = None
215 cli.main(['--import', 'Hello'], config = self.config)
217 def testSelections(self):
218 from zeroinstall.injector import cli
219 root = qdom.parse(open("selections.xml"))
220 sels = selections.Selections(root)
221 class Options: dry_run = False
223 with output_suppressed():
224 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
225 sys.stdin = Reply("Y\n")
226 try:
227 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
228 assert False
229 except NotStored:
230 pass
231 cli.main(['--download-only', 'selections.xml'], config = self.config)
232 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
233 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
235 assert sels.download_missing(self.config) is None
237 def testHelpers(self):
238 from zeroinstall import helpers
240 with output_suppressed():
241 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
242 sys.stdin = Reply("Y\n")
243 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
244 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
245 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
246 assert sels.download_missing(self.config) is None
248 def testSelectionsWithFeed(self):
249 from zeroinstall.injector import cli
250 root = qdom.parse(open("selections.xml"))
251 sels = selections.Selections(root)
253 with output_suppressed():
254 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
255 sys.stdin = Reply("Y\n")
257 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
259 cli.main(['--download-only', 'selections.xml'], config = self.config)
260 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
261 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
263 assert sels.download_missing(self.config) is None
265 def testAcceptKey(self):
266 with output_suppressed():
267 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
268 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
269 assert driver.need_download()
270 sys.stdin = Reply("Y\n")
271 try:
272 download_and_execute(driver, ['Hello'], main = 'Missing')
273 assert 0
274 except model.SafeException as ex:
275 if "HelloWorld/Missing" not in str(ex):
276 raise
278 def testAutoAcceptKey(self):
279 self.config.auto_approve_keys = True
280 with output_suppressed():
281 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
282 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
283 assert driver.need_download()
284 sys.stdin = Reply("")
285 try:
286 download_and_execute(driver, ['Hello'], main = 'Missing')
287 assert 0
288 except model.SafeException as ex:
289 if "HelloWorld/Missing" not in str(ex):
290 raise
292 def testDistro(self):
293 with output_suppressed():
294 native_url = 'http://example.com:8000/Native.xml'
296 # Initially, we don't have the feed at all...
297 master_feed = self.config.iface_cache.get_feed(native_url)
298 assert master_feed is None, master_feed
300 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
301 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
302 driver = Driver(requirements = Requirements(native_url), config = self.config)
303 assert driver.need_download()
305 solve = driver.solve_with_downloads()
306 tasks.wait_for_blocker(solve)
307 tasks.check(solve)
309 master_feed = self.config.iface_cache.get_feed(native_url)
310 assert master_feed is not None
311 assert master_feed.implementations == {}
313 distro_feed_url = master_feed.get_distro_feed()
314 assert distro_feed_url is not None
315 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
316 assert distro_feed is not None
317 assert len(distro_feed.implementations) == 2, distro_feed.implementations
319 def testWrongSize(self):
320 with output_suppressed():
321 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
322 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
323 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
324 assert driver.need_download()
325 sys.stdin = Reply("Y\n")
326 try:
327 download_and_execute(driver, ['Hello'], main = 'Missing')
328 assert 0
329 except model.SafeException as ex:
330 if "Downloaded archive has incorrect size" not in str(ex):
331 raise ex
333 def testRecipe(self):
334 old_out = sys.stdout
335 try:
336 sys.stdout = StringIO()
337 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
338 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
339 try:
340 download_and_execute(driver, [])
341 assert False
342 except model.SafeException as ex:
343 if "HelloWorld/Missing" not in str(ex):
344 raise ex
345 finally:
346 sys.stdout = old_out
348 def testSymlink(self):
349 old_out = sys.stdout
350 try:
351 sys.stdout = StringIO()
352 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
353 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
354 try:
355 download_and_execute(driver, [])
356 assert False
357 except model.SafeException as ex:
358 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
359 raise
360 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
361 finally:
362 sys.stdout = old_out
364 def testAutopackage(self):
365 old_out = sys.stdout
366 try:
367 sys.stdout = StringIO()
368 run_server('HelloWorld.autopackage')
369 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
370 try:
371 download_and_execute(driver, [])
372 assert False
373 except model.SafeException as ex:
374 if "HelloWorld/Missing" not in str(ex):
375 raise
376 finally:
377 sys.stdout = old_out
379 def testRecipeFailure(self):
380 old_out = sys.stdout
381 try:
382 sys.stdout = StringIO()
383 run_server('*')
384 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
385 try:
386 download_and_execute(driver, [])
387 assert False
388 except download.DownloadError as ex:
389 if "Connection" not in str(ex):
390 raise
391 finally:
392 sys.stdout = old_out
394 def testMirrors(self):
395 old_out = sys.stdout
396 try:
397 sys.stdout = StringIO()
398 getLogger().setLevel(ERROR)
399 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
400 run_server(server.Give404('/Hello.xml'),
401 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
402 '/0mirror/keys/6FCF121BE2390E0B.gpg')
403 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
404 self.config.feed_mirror = 'http://example.com:8000/0mirror'
406 refreshed = driver.solve_with_downloads()
407 tasks.wait_for_blocker(refreshed)
408 assert driver.solver.ready
409 finally:
410 sys.stdout = old_out
412 def testReplay(self):
413 old_out = sys.stdout
414 try:
415 sys.stdout = StringIO()
416 getLogger().setLevel(ERROR)
417 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
418 mtime = int(os.stat('Hello-new.xml').st_mtime)
419 self.config.iface_cache.update_feed_from_network(iface.uri, open('Hello-new.xml').read(), mtime + 10000)
421 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
422 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
423 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
424 self.config.feed_mirror = 'http://example.com:8000/0mirror'
426 # Update from mirror (should ignore out-of-date timestamp)
427 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
428 tasks.wait_for_blocker(refreshed)
430 # Update from upstream (should report an error)
431 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
432 try:
433 tasks.wait_for_blocker(refreshed)
434 raise Exception("Should have been rejected!")
435 except model.SafeException as ex:
436 assert "New feed's modification time is before old version" in str(ex)
438 # Must finish with the newest version
439 self.assertEqual(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
440 finally:
441 sys.stdout = old_out
443 def testBackground(self, verbose = False):
444 r = Requirements('http://example.com:8000/Hello.xml')
445 d = Driver(requirements = r, config = self.config)
446 self.import_feed(r.interface_uri, 'Hello.xml')
447 self.config.freshness = 0
448 self.config.network_use = model.network_minimal
449 d.solver.solve(r.interface_uri, arch.get_host_architecture())
450 assert d.solver.ready, d.solver.get_failure_reason()
452 @tasks.async
453 def choose_download(registed_cb, nid, actions):
454 try:
455 assert actions == ['download', 'Download'], actions
456 registed_cb(nid, 'download')
457 except:
458 import traceback
459 traceback.print_exc()
460 yield None
462 global ran_gui
463 ran_gui = False
464 os.environ['DISPLAY'] = 'dummy'
465 old_out = sys.stdout
466 try:
467 sys.stdout = StringIO()
468 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
469 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
470 my_dbus.user_callback = choose_download
472 with trapped_exit(1):
473 from zeroinstall.injector import config
474 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
475 config.DEFAULT_KEY_LOOKUP_SERVER = None
476 try:
477 background.spawn_background_update(d, verbose)
478 finally:
479 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
480 finally:
481 sys.stdout = old_out
482 assert ran_gui
484 def testBackgroundVerbose(self):
485 self.testBackground(verbose = True)
487 def testBackgroundApp(self):
488 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
490 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
492 global ran_gui
494 with output_suppressed():
495 # Select a version of Hello
496 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
497 r = Requirements('http://example.com:8000/Hello.xml')
498 driver = Driver(requirements = r, config = self.config)
499 tasks.wait_for_blocker(driver.solve_with_downloads())
500 assert driver.solver.ready
501 kill_server_process()
503 # Save it as an app
504 app = self.config.app_mgr.create_app('test-app', r)
505 app.set_selections(driver.solver.selections)
506 timestamp = os.path.join(app.path, 'last-checked')
507 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
508 selections_path = os.path.join(app.path, 'selections.xml')
510 def reset_timestamps():
511 ran_gui = False
512 os.utime(timestamp, (1, 1)) # 1970
513 os.utime(selections_path, (1, 1))
514 if os.path.exists(last_check_attempt):
515 os.unlink(last_check_attempt)
517 # Download the implementation
518 sels = app.get_selections()
519 run_server('HelloWorld.tgz')
520 tasks.wait_for_blocker(app.download_selections(sels))
521 kill_server_process()
523 # Not time for a background update yet
524 self.config.freshness = 100
525 dl = app.download_selections(sels)
526 assert dl == None
527 assert not ran_gui
529 # Trigger a background update - no updates found
530 reset_timestamps()
531 run_server('Hello.xml')
532 with trapped_exit(1):
533 dl = app.download_selections(sels)
534 assert dl == None
535 assert not ran_gui
536 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
537 self.assertEqual(1, os.stat(selections_path).st_mtime)
538 kill_server_process()
540 # Change the selections
541 sels_path = os.path.join(app.path, 'selections.xml')
542 with open(sels_path) as stream:
543 old = stream.read()
544 with open(sels_path, 'w') as stream:
545 stream.write(old.replace('Hello', 'Goodbye'))
547 # Trigger another background update - metadata changes found
548 reset_timestamps()
549 run_server('Hello.xml')
550 with trapped_exit(1):
551 dl = app.download_selections(sels)
552 assert dl == None
553 assert not ran_gui
554 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
555 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
556 kill_server_process()
558 # Trigger another background update - GUI needed now
560 # Delete cached implementation so we need to download it again
561 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
562 assert os.path.basename(stored).startswith('sha1')
563 ro_rmtree(stored)
565 # Replace with a valid local feed so we don't have to download immediately
566 with open(sels_path, 'w') as stream:
567 stream.write(local_hello)
568 sels = app.get_selections()
570 os.environ['DISPLAY'] = 'dummy'
571 reset_timestamps()
572 run_server('Hello.xml')
573 with trapped_exit(1):
574 dl = app.download_selections(sels)
575 assert dl == None
576 assert ran_gui # (so doesn't actually update)
577 kill_server_process()
579 # Now again with no DISPLAY
580 reset_timestamps()
581 del os.environ['DISPLAY']
582 run_server('Hello.xml', 'HelloWorld.tgz')
583 with trapped_exit(1):
584 dl = app.download_selections(sels)
585 assert dl == None
586 assert ran_gui # (so doesn't actually update)
588 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
589 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
590 kill_server_process()
592 sels = app.get_selections()
593 sel, = sels.selections.values()
594 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
596 # Untrust the key
597 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
599 os.environ['DISPLAY'] = 'dummy'
600 reset_timestamps()
601 run_server('Hello.xml')
602 with trapped_exit(1):
603 #import logging; logging.getLogger().setLevel(logging.INFO)
604 dl = app.download_selections(sels)
605 assert dl == None
606 assert ran_gui
607 kill_server_process()
609 # Update not triggered because of last-check-attempt
610 ran_gui = False
611 os.utime(timestamp, (1, 1)) # 1970
612 os.utime(selections_path, (1, 1))
613 dl = app.download_selections(sels)
614 assert dl == None
615 assert not ran_gui
617 if __name__ == '__main__':
618 try:
619 unittest.main()
620 finally:
621 kill_server_process()