Start development series 1.7-post
[zeroinstall/solver.git] / tests / testdownload.py
blob4463a1dd5d571d327d244f62cb0891db24961b68
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 import server
24 ran_gui = False
25 def raise_gui(*args):
26 global ran_gui
27 ran_gui = True
28 background._detach = lambda: False
29 background._exec_gui = raise_gui
31 @contextmanager
32 def output_suppressed():
33 old_stdout = sys.stdout
34 old_stderr = sys.stderr
35 try:
36 sys.stdout = StringIO()
37 sys.stderr = StringIO()
38 try:
39 yield
40 except Exception:
41 raise
42 except BaseException as ex:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex)) + " " + str(ex))
45 finally:
46 sys.stdout = old_stdout
47 sys.stderr = old_stderr
49 class Reply:
50 def __init__(self, reply):
51 self.reply = reply
53 def readline(self):
54 return self.reply
56 def download_and_execute(policy, prog_args, main = None):
57 downloaded = policy.solve_and_download_impls()
58 if downloaded:
59 tasks.wait_for_blocker(downloaded)
60 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
62 class NetworkManager:
63 def state(self):
64 return 3 # NM_STATUS_CONNECTED
66 server_process = None
67 def kill_server_process():
68 global server_process
69 if server_process is not None:
70 os.kill(server_process, signal.SIGTERM)
71 os.waitpid(server_process, 0)
72 server_process = None
74 def run_server(*args):
75 global server_process
76 assert server_process is None
77 server_process = server.handle_requests(*args)
79 class TestDownload(BaseTest):
80 def setUp(self):
81 BaseTest.setUp(self)
83 self.config.handler.allow_downloads = True
84 self.config.key_info_server = 'http://localhost:3333/key-info'
86 self.config.fetcher = fetch.Fetcher(self.config)
88 stream = tempfile.TemporaryFile()
89 stream.write(data.thomas_key)
90 stream.seek(0)
91 gpg.import_key(stream)
92 stream.close()
94 trust.trust_db.watchers = []
96 def tearDown(self):
97 BaseTest.tearDown(self)
98 kill_server_process()
100 def testRejectKey(self):
101 with output_suppressed():
102 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
103 policy = Policy('http://localhost:8000/Hello', config = self.config)
104 assert policy.need_download()
105 sys.stdin = Reply("N\n")
106 try:
107 download_and_execute(policy, ['Hello'])
108 assert 0
109 except model.SafeException as ex:
110 if "has no usable implementations" not in str(ex):
111 raise ex
112 if "Not signed with a trusted key" not in str(policy.handler.ex):
113 raise ex
114 self.config.handler.ex = None
116 def testRejectKeyXML(self):
117 with output_suppressed():
118 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
119 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
120 assert policy.need_download()
121 sys.stdin = Reply("N\n")
122 try:
123 download_and_execute(policy, ['Hello'])
124 assert 0
125 except model.SafeException as ex:
126 if "has no usable implementations" not in str(ex):
127 raise ex
128 if "Not signed with a trusted key" not in str(policy.handler.ex):
129 raise
130 self.config.handler.ex = None
132 def testImport(self):
133 from zeroinstall.injector import cli
135 rootLogger = getLogger()
136 rootLogger.disabled = True
137 try:
138 try:
139 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
140 assert 0
141 except model.SafeException as ex:
142 assert 'NO-SUCH-FILE' in str(ex)
143 finally:
144 rootLogger.disabled = False
145 rootLogger.setLevel(WARN)
147 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
148 self.assertEqual(None, hello)
150 with output_suppressed():
151 run_server('6FCF121BE2390E0B.gpg')
152 sys.stdin = Reply("Y\n")
154 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
155 cli.main(['--import', 'Hello'], config = self.config)
156 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
158 # Check we imported the interface after trusting the key
159 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
160 self.assertEqual(1, len(hello.implementations))
162 self.assertEqual(None, hello.local_path)
164 # Shouldn't need to prompt the second time
165 sys.stdin = None
166 cli.main(['--import', 'Hello'], config = self.config)
168 def testSelections(self):
169 from zeroinstall.injector import cli
170 root = qdom.parse(open("selections.xml"))
171 sels = selections.Selections(root)
172 class Options: dry_run = False
174 with output_suppressed():
175 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
176 sys.stdin = Reply("Y\n")
177 try:
178 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
179 assert False
180 except NotStored:
181 pass
182 cli.main(['--download-only', 'selections.xml'], config = self.config)
183 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
184 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
186 assert sels.download_missing(self.config) is None
188 def testHelpers(self):
189 from zeroinstall import helpers
191 with output_suppressed():
192 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
193 sys.stdin = Reply("Y\n")
194 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
195 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
196 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
197 assert sels.download_missing(self.config) is None
199 def testSelectionsWithFeed(self):
200 from zeroinstall.injector import cli
201 root = qdom.parse(open("selections.xml"))
202 sels = selections.Selections(root)
204 with output_suppressed():
205 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
206 sys.stdin = Reply("Y\n")
208 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
210 cli.main(['--download-only', 'selections.xml'], config = self.config)
211 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
212 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
214 assert sels.download_missing(self.config) is None
216 def testAcceptKey(self):
217 with output_suppressed():
218 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
219 policy = Policy('http://localhost:8000/Hello', config = self.config)
220 assert policy.need_download()
221 sys.stdin = Reply("Y\n")
222 try:
223 download_and_execute(policy, ['Hello'], main = 'Missing')
224 assert 0
225 except model.SafeException as ex:
226 if "HelloWorld/Missing" not in str(ex):
227 raise
229 def testAutoAcceptKey(self):
230 self.config.auto_approve_keys = True
231 with output_suppressed():
232 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
233 policy = Policy('http://localhost:8000/Hello', config = self.config)
234 assert policy.need_download()
235 sys.stdin = Reply("")
236 try:
237 download_and_execute(policy, ['Hello'], main = 'Missing')
238 assert 0
239 except model.SafeException as ex:
240 if "HelloWorld/Missing" not in str(ex):
241 raise
243 def testDistro(self):
244 with output_suppressed():
245 native_url = 'http://example.com:8000/Native.xml'
247 # Initially, we don't have the feed at all...
248 master_feed = self.config.iface_cache.get_feed(native_url)
249 assert master_feed is None, master_feed
251 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
252 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
253 policy = Policy(native_url, config = self.config)
254 assert policy.need_download()
256 solve = policy.solve_with_downloads()
257 self.config.handler.wait_for_blocker(solve)
258 tasks.check(solve)
260 master_feed = self.config.iface_cache.get_feed(native_url)
261 assert master_feed is not None
262 assert master_feed.implementations == {}
264 distro_feed_url = master_feed.get_distro_feed()
265 assert distro_feed_url is not None
266 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
267 assert distro_feed is not None
268 assert len(distro_feed.implementations) == 2, distro_feed.implementations
270 def testWrongSize(self):
271 with output_suppressed():
272 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
273 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
274 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
275 assert policy.need_download()
276 sys.stdin = Reply("Y\n")
277 try:
278 download_and_execute(policy, ['Hello'], main = 'Missing')
279 assert 0
280 except model.SafeException as ex:
281 if "Downloaded archive has incorrect size" not in str(ex):
282 raise ex
284 def testRecipe(self):
285 old_out = sys.stdout
286 try:
287 sys.stdout = StringIO()
288 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
289 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
290 try:
291 download_and_execute(policy, [])
292 assert False
293 except model.SafeException as ex:
294 if "HelloWorld/Missing" not in str(ex):
295 raise ex
296 finally:
297 sys.stdout = old_out
299 def testSymlink(self):
300 old_out = sys.stdout
301 try:
302 sys.stdout = StringIO()
303 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
304 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
305 try:
306 download_and_execute(policy, [])
307 assert False
308 except model.SafeException as ex:
309 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
310 raise
311 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
312 finally:
313 sys.stdout = old_out
315 def testAutopackage(self):
316 old_out = sys.stdout
317 try:
318 sys.stdout = StringIO()
319 run_server('HelloWorld.autopackage')
320 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
321 try:
322 download_and_execute(policy, [])
323 assert False
324 except model.SafeException as ex:
325 if "HelloWorld/Missing" not in str(ex):
326 raise
327 finally:
328 sys.stdout = old_out
330 def testRecipeFailure(self):
331 old_out = sys.stdout
332 try:
333 sys.stdout = StringIO()
334 run_server('*')
335 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
336 try:
337 download_and_execute(policy, [])
338 assert False
339 except download.DownloadError as ex:
340 if "Connection" not in str(ex):
341 raise
342 finally:
343 sys.stdout = old_out
345 def testMirrors(self):
346 old_out = sys.stdout
347 try:
348 sys.stdout = StringIO()
349 getLogger().setLevel(ERROR)
350 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
351 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
352 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
353 self.config.feed_mirror = 'http://example.com:8000/0mirror'
355 refreshed = policy.solve_with_downloads()
356 policy.handler.wait_for_blocker(refreshed)
357 assert policy.ready
358 finally:
359 sys.stdout = old_out
361 def testReplay(self):
362 old_out = sys.stdout
363 try:
364 sys.stdout = StringIO()
365 getLogger().setLevel(ERROR)
366 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
367 mtime = int(os.stat('Hello-new.xml').st_mtime)
368 self.config.iface_cache.update_feed_from_network(iface.uri, open('Hello-new.xml').read(), mtime + 10000)
370 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
371 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
372 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
373 self.config.feed_mirror = 'http://example.com:8000/0mirror'
375 # Update from mirror (should ignore out-of-date timestamp)
376 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
377 policy.handler.wait_for_blocker(refreshed)
379 # Update from upstream (should report an error)
380 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
381 try:
382 policy.handler.wait_for_blocker(refreshed)
383 raise Exception("Should have been rejected!")
384 except model.SafeException as ex:
385 assert "New feed's modification time is before old version" in str(ex)
387 # Must finish with the newest version
388 self.assertEqual(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
389 finally:
390 sys.stdout = old_out
392 def testBackground(self, verbose = False):
393 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
394 self.import_feed(p.root, 'Hello.xml')
395 p.freshness = 0
396 p.network_use = model.network_minimal
397 p.solver.solve(p.root, arch.get_host_architecture())
398 assert p.ready, p.solver.get_failure_reason()
400 @tasks.async
401 def choose_download(registed_cb, nid, actions):
402 try:
403 assert actions == ['download', 'Download'], actions
404 registed_cb(nid, 'download')
405 except:
406 import traceback
407 traceback.print_exc()
408 yield None
410 global ran_gui
411 ran_gui = False
412 old_out = sys.stdout
413 try:
414 sys.stdout = StringIO()
415 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
416 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
417 my_dbus.user_callback = choose_download
418 pid = os.getpid()
419 old_exit = os._exit
420 def my_exit(code):
421 # The background handler runs in the same process
422 # as the tests, so don't let it abort.
423 if os.getpid() == pid:
424 raise SystemExit(code)
425 # But, child download processes are OK
426 old_exit(code)
427 from zeroinstall.injector import config
428 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
429 config.DEFAULT_KEY_LOOKUP_SERVER = None
430 try:
431 try:
432 os._exit = my_exit
433 background.spawn_background_update(p, verbose)
434 assert False
435 except SystemExit as ex:
436 self.assertEqual(1, ex.code)
437 finally:
438 os._exit = old_exit
439 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
440 finally:
441 sys.stdout = old_out
442 assert ran_gui
444 def testBackgroundVerbose(self):
445 self.testBackground(verbose = True)
447 if __name__ == '__main__':
448 try:
449 unittest.main()
450 finally:
451 kill_server_process()