Updated uses of deprecated file() function
[zeroinstall.git] / tests / testdownload.py
blob7acfd83ce1bdc8e315bb806bf855d75cac67181b
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 import server
24 ran_gui = False
25 def raise_gui(*args):
26 global ran_gui
27 ran_gui = True
28 background._detach = lambda: False
29 background._exec_gui = raise_gui
31 @contextmanager
32 def output_suppressed():
33 old_stdout = sys.stdout
34 old_stderr = sys.stderr
35 try:
36 sys.stdout = StringIO()
37 sys.stderr = StringIO()
38 try:
39 yield
40 except Exception:
41 raise
42 except BaseException as ex:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex)) + " " + str(ex))
45 finally:
46 sys.stdout = old_stdout
47 sys.stderr = old_stderr
49 class Reply:
50 def __init__(self, reply):
51 self.reply = reply
53 def readline(self):
54 return self.reply
56 def download_and_execute(policy, prog_args, main = None):
57 downloaded = policy.solve_and_download_impls()
58 if downloaded:
59 policy.config.handler.wait_for_blocker(downloaded)
60 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
62 class NetworkManager:
63 def state(self):
64 return 3 # NM_STATUS_CONNECTED
66 server_process = None
67 def kill_server_process():
68 global server_process
69 if server_process is not None:
70 os.kill(server_process, signal.SIGTERM)
71 os.waitpid(server_process, 0)
72 server_process = None
74 def run_server(*args):
75 global server_process
76 assert server_process is None
77 server_process = server.handle_requests(*args)
79 class TestDownload(BaseTest):
80 def setUp(self):
81 BaseTest.setUp(self)
83 self.config.handler.allow_downloads = True
84 self.config.key_info_server = 'http://localhost:3333/key-info'
86 self.config.fetcher = fetch.Fetcher(self.config)
88 stream = tempfile.TemporaryFile()
89 stream.write(data.thomas_key)
90 stream.seek(0)
91 gpg.import_key(stream)
93 trust.trust_db.watchers = []
95 def tearDown(self):
96 BaseTest.tearDown(self)
97 kill_server_process()
99 def testRejectKey(self):
100 with output_suppressed():
101 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
102 policy = Policy('http://localhost:8000/Hello', config = self.config)
103 assert policy.need_download()
104 sys.stdin = Reply("N\n")
105 try:
106 download_and_execute(policy, ['Hello'])
107 assert 0
108 except model.SafeException as ex:
109 if "has no usable implementations" not in str(ex):
110 raise ex
111 if "Not signed with a trusted key" not in str(policy.handler.ex):
112 raise ex
113 self.config.handler.ex = None
115 def testRejectKeyXML(self):
116 with output_suppressed():
117 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
118 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
119 assert policy.need_download()
120 sys.stdin = Reply("N\n")
121 try:
122 download_and_execute(policy, ['Hello'])
123 assert 0
124 except model.SafeException as ex:
125 if "has no usable implementations" not in str(ex):
126 raise ex
127 if "Not signed with a trusted key" not in str(policy.handler.ex):
128 raise
129 self.config.handler.ex = None
131 def testImport(self):
132 from zeroinstall.injector import cli
134 rootLogger = getLogger()
135 rootLogger.disabled = True
136 try:
137 try:
138 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
139 assert 0
140 except model.SafeException as ex:
141 assert 'NO-SUCH-FILE' in str(ex)
142 finally:
143 rootLogger.disabled = False
144 rootLogger.setLevel(WARN)
146 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
147 self.assertEquals(None, hello)
149 with output_suppressed():
150 run_server('6FCF121BE2390E0B.gpg')
151 sys.stdin = Reply("Y\n")
153 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
154 cli.main(['--import', 'Hello'], config = self.config)
155 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
157 # Check we imported the interface after trusting the key
158 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
159 self.assertEquals(1, len(hello.implementations))
161 # Shouldn't need to prompt the second time
162 sys.stdin = None
163 cli.main(['--import', 'Hello'], config = self.config)
165 def testSelections(self):
166 from zeroinstall.injector import cli
167 root = qdom.parse(open("selections.xml"))
168 sels = selections.Selections(root)
169 class Options: dry_run = False
171 with output_suppressed():
172 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
173 sys.stdin = Reply("Y\n")
174 try:
175 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
176 assert False
177 except NotStored:
178 pass
179 cli.main(['--download-only', 'selections.xml'], config = self.config)
180 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
181 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
183 assert sels.download_missing(self.config) is None
185 def testHelpers(self):
186 from zeroinstall import helpers
188 with output_suppressed():
189 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
190 sys.stdin = Reply("Y\n")
191 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
192 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
193 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
194 assert sels.download_missing(self.config) is None
196 def testSelectionsWithFeed(self):
197 from zeroinstall.injector import cli
198 root = qdom.parse(open("selections.xml"))
199 sels = selections.Selections(root)
201 with output_suppressed():
202 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
203 sys.stdin = Reply("Y\n")
205 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
207 cli.main(['--download-only', 'selections.xml'], config = self.config)
208 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
209 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
211 assert sels.download_missing(self.config) is None
213 def testAcceptKey(self):
214 with output_suppressed():
215 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
216 policy = Policy('http://localhost:8000/Hello', config = self.config)
217 assert policy.need_download()
218 sys.stdin = Reply("Y\n")
219 try:
220 download_and_execute(policy, ['Hello'], main = 'Missing')
221 assert 0
222 except model.SafeException as ex:
223 if "HelloWorld/Missing" not in str(ex):
224 raise
226 def testAutoAcceptKey(self):
227 self.config.auto_approve_keys = True
228 with output_suppressed():
229 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
230 policy = Policy('http://localhost:8000/Hello', config = self.config)
231 assert policy.need_download()
232 sys.stdin = Reply("")
233 try:
234 download_and_execute(policy, ['Hello'], main = 'Missing')
235 assert 0
236 except model.SafeException as ex:
237 if "HelloWorld/Missing" not in str(ex):
238 raise
240 def testDistro(self):
241 with output_suppressed():
242 native_url = 'http://example.com:8000/Native.xml'
244 # Initially, we don't have the feed at all...
245 master_feed = self.config.iface_cache.get_feed(native_url)
246 assert master_feed is None, master_feed
248 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
249 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
250 policy = Policy(native_url, config = self.config)
251 assert policy.need_download()
253 solve = policy.solve_with_downloads()
254 self.config.handler.wait_for_blocker(solve)
255 tasks.check(solve)
257 master_feed = self.config.iface_cache.get_feed(native_url)
258 assert master_feed is not None
259 assert master_feed.implementations == {}
261 distro_feed_url = master_feed.get_distro_feed()
262 assert distro_feed_url is not None
263 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
264 assert distro_feed is not None
265 assert len(distro_feed.implementations) == 2, distro_feed.implementations
267 def testWrongSize(self):
268 with output_suppressed():
269 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
270 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
271 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
272 assert policy.need_download()
273 sys.stdin = Reply("Y\n")
274 try:
275 download_and_execute(policy, ['Hello'], main = 'Missing')
276 assert 0
277 except model.SafeException as ex:
278 if "Downloaded archive has incorrect size" not in str(ex):
279 raise ex
281 def testRecipe(self):
282 old_out = sys.stdout
283 try:
284 sys.stdout = StringIO()
285 run_server(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
286 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
287 try:
288 download_and_execute(policy, [])
289 assert False
290 except model.SafeException as ex:
291 if "HelloWorld/Missing" not in str(ex):
292 raise ex
293 finally:
294 sys.stdout = old_out
296 def testSymlink(self):
297 old_out = sys.stdout
298 try:
299 sys.stdout = StringIO()
300 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
301 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
302 try:
303 download_and_execute(policy, [])
304 assert False
305 except model.SafeException as ex:
306 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
307 raise
308 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
309 finally:
310 sys.stdout = old_out
312 def testAutopackage(self):
313 old_out = sys.stdout
314 try:
315 sys.stdout = StringIO()
316 run_server('HelloWorld.autopackage')
317 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
318 try:
319 download_and_execute(policy, [])
320 assert False
321 except model.SafeException as ex:
322 if "HelloWorld/Missing" not in str(ex):
323 raise
324 finally:
325 sys.stdout = old_out
327 def testRecipeFailure(self):
328 old_out = sys.stdout
329 try:
330 sys.stdout = StringIO()
331 run_server('*')
332 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
333 try:
334 download_and_execute(policy, [])
335 assert False
336 except download.DownloadError as ex:
337 if "Connection" not in str(ex):
338 raise
339 finally:
340 sys.stdout = old_out
342 def testMirrors(self):
343 old_out = sys.stdout
344 try:
345 sys.stdout = StringIO()
346 getLogger().setLevel(ERROR)
347 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
348 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
349 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
350 self.config.feed_mirror = 'http://example.com:8000/0mirror'
352 refreshed = policy.solve_with_downloads()
353 policy.handler.wait_for_blocker(refreshed)
354 assert policy.ready
355 finally:
356 sys.stdout = old_out
358 def testReplay(self):
359 old_out = sys.stdout
360 try:
361 sys.stdout = StringIO()
362 getLogger().setLevel(ERROR)
363 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
364 mtime = int(os.stat('Hello-new.xml').st_mtime)
365 self.config.iface_cache.update_feed_from_network(iface.uri, open('Hello-new.xml').read(), mtime + 10000)
367 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
368 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
369 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
370 self.config.feed_mirror = 'http://example.com:8000/0mirror'
372 # Update from mirror (should ignore out-of-date timestamp)
373 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
374 policy.handler.wait_for_blocker(refreshed)
376 # Update from upstream (should report an error)
377 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
378 try:
379 policy.handler.wait_for_blocker(refreshed)
380 raise Exception("Should have been rejected!")
381 except model.SafeException as ex:
382 assert "New feed's modification time is before old version" in str(ex)
384 # Must finish with the newest version
385 self.assertEquals(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
386 finally:
387 sys.stdout = old_out
389 def testBackground(self, verbose = False):
390 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
391 self.import_feed(p.root, 'Hello.xml')
392 p.freshness = 0
393 p.network_use = model.network_minimal
394 p.solver.solve(p.root, arch.get_host_architecture())
395 assert p.ready, p.solver.get_failure_reason()
397 @tasks.async
398 def choose_download(registed_cb, nid, actions):
399 try:
400 assert actions == ['download', 'Download'], actions
401 registed_cb(nid, 'download')
402 except:
403 import traceback
404 traceback.print_exc()
405 yield None
407 global ran_gui
408 ran_gui = False
409 old_out = sys.stdout
410 try:
411 sys.stdout = StringIO()
412 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
413 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
414 my_dbus.user_callback = choose_download
415 pid = os.getpid()
416 old_exit = os._exit
417 def my_exit(code):
418 # The background handler runs in the same process
419 # as the tests, so don't let it abort.
420 if os.getpid() == pid:
421 raise SystemExit(code)
422 # But, child download processes are OK
423 old_exit(code)
424 from zeroinstall.injector import config
425 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
426 config.DEFAULT_KEY_LOOKUP_SERVER = None
427 try:
428 try:
429 os._exit = my_exit
430 background.spawn_background_update(p, verbose)
431 assert False
432 except SystemExit as ex:
433 self.assertEquals(1, ex.code)
434 finally:
435 os._exit = old_exit
436 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
437 finally:
438 sys.stdout = old_out
439 assert ran_gui
441 def testBackgroundVerbose(self):
442 self.testBackground(verbose = True)
444 if __name__ == '__main__':
445 try:
446 unittest.main()
447 finally:
448 kill_server_process()