Added Selection.get_path()
[zeroinstall/solver.git] / tests / testdownload.py
blob7745c2706dff88db6b3baaedb2b464426c589a6d
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.requirements import Requirements
16 from zeroinstall.injector.driver import Driver
17 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
18 from zeroinstall.support import basedir, tasks
19 from zeroinstall.injector import fetch
20 import data
21 import my_dbus
23 import server
25 ran_gui = False
26 def raise_gui(*args):
27 global ran_gui
28 ran_gui = True
29 background._detach = lambda: False
30 background._exec_gui = raise_gui
32 @contextmanager
33 def output_suppressed():
34 old_stdout = sys.stdout
35 old_stderr = sys.stderr
36 try:
37 sys.stdout = StringIO()
38 sys.stderr = StringIO()
39 try:
40 yield
41 except Exception:
42 raise
43 except BaseException as ex:
44 # Don't abort unit-tests if someone raises SystemExit
45 raise Exception(str(type(ex)) + " " + str(ex))
46 finally:
47 sys.stdout = old_stdout
48 sys.stderr = old_stderr
50 class Reply:
51 def __init__(self, reply):
52 self.reply = reply
54 def readline(self):
55 return self.reply
57 def download_and_execute(driver, prog_args, main = None):
58 downloaded = driver.solve_and_download_impls()
59 if downloaded:
60 tasks.wait_for_blocker(downloaded)
61 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
63 class NetworkManager:
64 def state(self):
65 return 3 # NM_STATUS_CONNECTED
67 server_process = None
68 def kill_server_process():
69 global server_process
70 if server_process is not None:
71 os.kill(server_process, signal.SIGTERM)
72 os.waitpid(server_process, 0)
73 server_process = None
75 def run_server(*args):
76 global server_process
77 assert server_process is None
78 server_process = server.handle_requests(*args)
80 class TestDownload(BaseTest):
81 def setUp(self):
82 BaseTest.setUp(self)
84 self.config.handler.allow_downloads = True
85 self.config.key_info_server = 'http://localhost:3333/key-info'
87 self.config.fetcher = fetch.Fetcher(self.config)
89 stream = tempfile.TemporaryFile()
90 stream.write(data.thomas_key)
91 stream.seek(0)
92 gpg.import_key(stream)
93 stream.close()
95 trust.trust_db.watchers = []
97 def tearDown(self):
98 BaseTest.tearDown(self)
99 kill_server_process()
101 def testRejectKey(self):
102 with output_suppressed():
103 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
104 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
105 assert driver.need_download()
106 sys.stdin = Reply("N\n")
107 try:
108 download_and_execute(driver, ['Hello'])
109 assert 0
110 except model.SafeException as ex:
111 if "has no usable implementations" not in str(ex):
112 raise ex
113 if "Not signed with a trusted key" not in str(self.config.handler.ex):
114 raise ex
115 self.config.handler.ex = None
117 def testRejectKeyXML(self):
118 with output_suppressed():
119 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
120 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
121 assert driver.need_download()
122 sys.stdin = Reply("N\n")
123 try:
124 download_and_execute(driver, ['Hello'])
125 assert 0
126 except model.SafeException as ex:
127 if "has no usable implementations" not in str(ex):
128 raise ex
129 if "Not signed with a trusted key" not in str(self.config.handler.ex):
130 raise
131 self.config.handler.ex = None
133 def testImport(self):
134 from zeroinstall.injector import cli
136 rootLogger = getLogger()
137 rootLogger.disabled = True
138 try:
139 try:
140 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
141 assert 0
142 except model.SafeException as ex:
143 assert 'NO-SUCH-FILE' in str(ex)
144 finally:
145 rootLogger.disabled = False
146 rootLogger.setLevel(WARN)
148 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
149 self.assertEqual(None, hello)
151 with output_suppressed():
152 run_server('6FCF121BE2390E0B.gpg')
153 sys.stdin = Reply("Y\n")
155 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 cli.main(['--import', 'Hello'], config = self.config)
157 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
159 # Check we imported the interface after trusting the key
160 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
161 self.assertEqual(1, len(hello.implementations))
163 self.assertEqual(None, hello.local_path)
165 # Shouldn't need to prompt the second time
166 sys.stdin = None
167 cli.main(['--import', 'Hello'], config = self.config)
169 def testSelections(self):
170 from zeroinstall.injector import cli
171 root = qdom.parse(open("selections.xml"))
172 sels = selections.Selections(root)
173 class Options: dry_run = False
175 with output_suppressed():
176 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
177 sys.stdin = Reply("Y\n")
178 try:
179 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
180 assert False
181 except NotStored:
182 pass
183 cli.main(['--download-only', 'selections.xml'], config = self.config)
184 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
185 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
187 assert sels.download_missing(self.config) is None
189 def testHelpers(self):
190 from zeroinstall import helpers
192 with output_suppressed():
193 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
194 sys.stdin = Reply("Y\n")
195 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
196 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
197 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
198 assert sels.download_missing(self.config) is None
200 def testSelectionsWithFeed(self):
201 from zeroinstall.injector import cli
202 root = qdom.parse(open("selections.xml"))
203 sels = selections.Selections(root)
205 with output_suppressed():
206 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
207 sys.stdin = Reply("Y\n")
209 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
211 cli.main(['--download-only', 'selections.xml'], config = self.config)
212 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
213 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
215 assert sels.download_missing(self.config) is None
217 def testAcceptKey(self):
218 with output_suppressed():
219 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
220 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
221 assert driver.need_download()
222 sys.stdin = Reply("Y\n")
223 try:
224 download_and_execute(driver, ['Hello'], main = 'Missing')
225 assert 0
226 except model.SafeException as ex:
227 if "HelloWorld/Missing" not in str(ex):
228 raise
230 def testAutoAcceptKey(self):
231 self.config.auto_approve_keys = True
232 with output_suppressed():
233 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
234 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
235 assert driver.need_download()
236 sys.stdin = Reply("")
237 try:
238 download_and_execute(driver, ['Hello'], main = 'Missing')
239 assert 0
240 except model.SafeException as ex:
241 if "HelloWorld/Missing" not in str(ex):
242 raise
244 def testDistro(self):
245 with output_suppressed():
246 native_url = 'http://example.com:8000/Native.xml'
248 # Initially, we don't have the feed at all...
249 master_feed = self.config.iface_cache.get_feed(native_url)
250 assert master_feed is None, master_feed
252 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
253 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
254 driver = Driver(requirements = Requirements(native_url), config = self.config)
255 assert driver.need_download()
257 solve = driver.solve_with_downloads()
258 tasks.wait_for_blocker(solve)
259 tasks.check(solve)
261 master_feed = self.config.iface_cache.get_feed(native_url)
262 assert master_feed is not None
263 assert master_feed.implementations == {}
265 distro_feed_url = master_feed.get_distro_feed()
266 assert distro_feed_url is not None
267 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
268 assert distro_feed is not None
269 assert len(distro_feed.implementations) == 2, distro_feed.implementations
271 def testWrongSize(self):
272 with output_suppressed():
273 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
274 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
275 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
276 assert driver.need_download()
277 sys.stdin = Reply("Y\n")
278 try:
279 download_and_execute(driver, ['Hello'], main = 'Missing')
280 assert 0
281 except model.SafeException as ex:
282 if "Downloaded archive has incorrect size" not in str(ex):
283 raise ex
285 def testRecipe(self):
286 old_out = sys.stdout
287 try:
288 sys.stdout = StringIO()
289 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
290 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
291 try:
292 download_and_execute(driver, [])
293 assert False
294 except model.SafeException as ex:
295 if "HelloWorld/Missing" not in str(ex):
296 raise ex
297 finally:
298 sys.stdout = old_out
300 def testSymlink(self):
301 old_out = sys.stdout
302 try:
303 sys.stdout = StringIO()
304 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
305 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
306 try:
307 download_and_execute(driver, [])
308 assert False
309 except model.SafeException as ex:
310 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
311 raise
312 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
313 finally:
314 sys.stdout = old_out
316 def testAutopackage(self):
317 old_out = sys.stdout
318 try:
319 sys.stdout = StringIO()
320 run_server('HelloWorld.autopackage')
321 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
322 try:
323 download_and_execute(driver, [])
324 assert False
325 except model.SafeException as ex:
326 if "HelloWorld/Missing" not in str(ex):
327 raise
328 finally:
329 sys.stdout = old_out
331 def testRecipeFailure(self):
332 old_out = sys.stdout
333 try:
334 sys.stdout = StringIO()
335 run_server('*')
336 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
337 try:
338 download_and_execute(driver, [])
339 assert False
340 except download.DownloadError as ex:
341 if "Connection" not in str(ex):
342 raise
343 finally:
344 sys.stdout = old_out
346 def testMirrors(self):
347 old_out = sys.stdout
348 try:
349 sys.stdout = StringIO()
350 getLogger().setLevel(ERROR)
351 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
352 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
353 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
354 self.config.feed_mirror = 'http://example.com:8000/0mirror'
356 refreshed = driver.solve_with_downloads()
357 tasks.wait_for_blocker(refreshed)
358 assert driver.solver.ready
359 finally:
360 sys.stdout = old_out
362 def testReplay(self):
363 old_out = sys.stdout
364 try:
365 sys.stdout = StringIO()
366 getLogger().setLevel(ERROR)
367 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
368 mtime = int(os.stat('Hello-new.xml').st_mtime)
369 self.config.iface_cache.update_feed_from_network(iface.uri, open('Hello-new.xml').read(), mtime + 10000)
371 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
372 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
373 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
374 self.config.feed_mirror = 'http://example.com:8000/0mirror'
376 # Update from mirror (should ignore out-of-date timestamp)
377 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
378 tasks.wait_for_blocker(refreshed)
380 # Update from upstream (should report an error)
381 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
382 try:
383 tasks.wait_for_blocker(refreshed)
384 raise Exception("Should have been rejected!")
385 except model.SafeException as ex:
386 assert "New feed's modification time is before old version" in str(ex)
388 # Must finish with the newest version
389 self.assertEqual(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
390 finally:
391 sys.stdout = old_out
393 def testBackground(self, verbose = False):
394 r = Requirements('http://example.com:8000/Hello.xml')
395 d = Driver(requirements = r, config = self.config)
396 self.import_feed(r.interface_uri, 'Hello.xml')
397 self.config.freshness = 0
398 self.config.network_use = model.network_minimal
399 d.solver.solve(r.interface_uri, arch.get_host_architecture())
400 assert d.solver.ready, d.solver.get_failure_reason()
402 @tasks.async
403 def choose_download(registed_cb, nid, actions):
404 try:
405 assert actions == ['download', 'Download'], actions
406 registed_cb(nid, 'download')
407 except:
408 import traceback
409 traceback.print_exc()
410 yield None
412 global ran_gui
413 ran_gui = False
414 old_out = sys.stdout
415 try:
416 sys.stdout = StringIO()
417 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
418 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
419 my_dbus.user_callback = choose_download
420 pid = os.getpid()
421 old_exit = os._exit
422 def my_exit(code):
423 # The background handler runs in the same process
424 # as the tests, so don't let it abort.
425 if os.getpid() == pid:
426 raise SystemExit(code)
427 # But, child download processes are OK
428 old_exit(code)
429 from zeroinstall.injector import config
430 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
431 config.DEFAULT_KEY_LOOKUP_SERVER = None
432 try:
433 try:
434 os._exit = my_exit
435 background.spawn_background_update(d, verbose)
436 assert False
437 except SystemExit as ex:
438 self.assertEqual(1, ex.code)
439 finally:
440 os._exit = old_exit
441 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
442 finally:
443 sys.stdout = old_out
444 assert ran_gui
446 def testBackgroundVerbose(self):
447 self.testBackground(verbose = True)
449 if __name__ == '__main__':
450 try:
451 unittest.main()
452 finally:
453 kill_server_process()