Support lzma/xz when using non-GNU tar
[zeroinstall/zeroinstall-afb.git] / tests / testdownload.py
blob046f7c9f19ba1d069f8f254712bc537d8b4a984b
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
24 import server
26 ran_gui = False
27 def raise_gui(*args):
28 global ran_gui
29 ran_gui = True
30 background._detach = lambda: False
31 background._exec_gui = raise_gui
33 @contextmanager
34 def output_suppressed():
35 old_stdout = sys.stdout
36 old_stderr = sys.stderr
37 try:
38 sys.stdout = StringIO()
39 sys.stderr = StringIO()
40 try:
41 yield
42 except Exception:
43 raise
44 except BaseException, ex:
45 # Don't abort unit-tests if someone raises SystemExit
46 raise Exception(str(type(ex)) + " " + str(ex))
47 finally:
48 sys.stdout = old_stdout
49 sys.stderr = old_stderr
51 class Reply:
52 def __init__(self, reply):
53 self.reply = reply
55 def readline(self):
56 return self.reply
58 def download_and_execute(policy, prog_args, main = None):
59 downloaded = policy.solve_and_download_impls()
60 if downloaded:
61 policy.config.handler.wait_for_blocker(downloaded)
62 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
64 class NetworkManager:
65 def state(self):
66 return 3 # NM_STATUS_CONNECTED
68 class TestDownload(BaseTest):
69 def setUp(self):
70 BaseTest.setUp(self)
72 self.config.handler.allow_downloads = True
73 self.config.fetcher = fetch.Fetcher(self.config)
75 stream = tempfile.TemporaryFile()
76 stream.write(data.thomas_key)
77 stream.seek(0)
78 gpg.import_key(stream)
79 self.child = None
81 trust.trust_db.watchers = []
83 def tearDown(self):
84 BaseTest.tearDown(self)
85 if self.child is not None:
86 os.kill(self.child, signal.SIGTERM)
87 os.waitpid(self.child, 0)
88 self.child = None
90 def testRejectKey(self):
91 with output_suppressed():
92 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
93 policy = Policy('http://localhost:8000/Hello', config = self.config)
94 assert policy.need_download()
95 sys.stdin = Reply("N\n")
96 try:
97 download_and_execute(policy, ['Hello'])
98 assert 0
99 except model.SafeException, ex:
100 if "has no usable implementations" not in str(ex):
101 raise ex
102 if "Not signed with a trusted key" not in str(policy.handler.ex):
103 raise ex
104 self.config.handler.ex = None
106 def testRejectKeyXML(self):
107 with output_suppressed():
108 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
109 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
110 assert policy.need_download()
111 sys.stdin = Reply("N\n")
112 try:
113 download_and_execute(policy, ['Hello'])
114 assert 0
115 except model.SafeException, ex:
116 if "has no usable implementations" not in str(ex):
117 raise ex
118 if "Not signed with a trusted key" not in str(policy.handler.ex):
119 raise
120 self.config.handler.ex = None
122 def testImport(self):
123 from zeroinstall.injector import cli
125 rootLogger = getLogger()
126 rootLogger.disabled = True
127 try:
128 try:
129 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
130 assert 0
131 except model.SafeException, ex:
132 assert 'NO-SUCH-FILE' in str(ex)
133 finally:
134 rootLogger.disabled = False
135 rootLogger.setLevel(WARN)
137 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
138 self.assertEquals(None, hello)
140 with output_suppressed():
141 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
142 sys.stdin = Reply("Y\n")
144 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
145 cli.main(['--import', 'Hello'])
146 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
148 # Check we imported the interface after trusting the key
149 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
150 self.assertEquals(1, len(hello.implementations))
152 # Shouldn't need to prompt the second time
153 sys.stdin = None
154 cli.main(['--import', 'Hello'])
156 def testSelections(self):
157 from zeroinstall.injector import cli
158 root = qdom.parse(file("selections.xml"))
159 sels = selections.Selections(root)
160 class Options: dry_run = False
162 with output_suppressed():
163 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
164 sys.stdin = Reply("Y\n")
165 try:
166 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
167 assert False
168 except NotStored:
169 pass
170 cli.main(['--download-only', 'selections.xml'])
171 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
172 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
174 assert sels.download_missing(self.config) is None
176 def testHelpers(self):
177 from zeroinstall import helpers
179 with output_suppressed():
180 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
181 sys.stdin = Reply("Y\n")
182 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml')
183 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
184 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
185 assert sels.download_missing(self.config) is None
187 def testSelectionsWithFeed(self):
188 from zeroinstall.injector import cli
189 root = qdom.parse(file("selections.xml"))
190 sels = selections.Selections(root)
192 with output_suppressed():
193 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
194 sys.stdin = Reply("Y\n")
196 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
198 cli.main(['--download-only', 'selections.xml'], config = self.config)
199 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
200 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
202 assert sels.download_missing(self.config) is None
204 def testAcceptKey(self):
205 with output_suppressed():
206 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
207 policy = Policy('http://localhost:8000/Hello', config = self.config)
208 assert policy.need_download()
209 sys.stdin = Reply("Y\n")
210 try:
211 download_and_execute(policy, ['Hello'], main = 'Missing')
212 assert 0
213 except model.SafeException, ex:
214 if "HelloWorld/Missing" not in str(ex):
215 raise
217 def testDistro(self):
218 with output_suppressed():
219 native_url = 'http://example.com:8000/Native.xml'
221 # Initially, we don't have the feed at all...
222 master_feed = self.config.iface_cache.get_feed(native_url)
223 assert master_feed is None, master_feed
225 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
226 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
227 policy = Policy(native_url, config = self.config)
228 assert policy.need_download()
230 solve = policy.solve_with_downloads()
231 self.config.handler.wait_for_blocker(solve)
232 tasks.check(solve)
234 master_feed = self.config.iface_cache.get_feed(native_url)
235 assert master_feed is not None
236 assert master_feed.implementations == {}
238 distro_feed_url = master_feed.get_distro_feed()
239 assert distro_feed_url is not None
240 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
241 assert distro_feed is not None
242 assert len(distro_feed.implementations) == 2, distro_feed.implementations
244 def testWrongSize(self):
245 with output_suppressed():
246 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
247 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
248 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
249 assert policy.need_download()
250 sys.stdin = Reply("Y\n")
251 try:
252 download_and_execute(policy, ['Hello'], main = 'Missing')
253 assert 0
254 except model.SafeException, ex:
255 if "Downloaded archive has incorrect size" not in str(ex):
256 raise ex
258 def testRecipe(self):
259 old_out = sys.stdout
260 try:
261 sys.stdout = StringIO()
262 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
263 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
264 try:
265 download_and_execute(policy, [])
266 assert False
267 except model.SafeException, ex:
268 if "HelloWorld/Missing" not in str(ex):
269 raise ex
270 finally:
271 sys.stdout = old_out
273 def testSymlink(self):
274 old_out = sys.stdout
275 try:
276 sys.stdout = StringIO()
277 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
278 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
279 try:
280 download_and_execute(policy, [])
281 assert False
282 except model.SafeException, ex:
283 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
284 raise
285 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
286 finally:
287 sys.stdout = old_out
289 def testAutopackage(self):
290 old_out = sys.stdout
291 try:
292 sys.stdout = StringIO()
293 self.child = server.handle_requests('HelloWorld.autopackage')
294 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
295 try:
296 download_and_execute(policy, [])
297 assert False
298 except model.SafeException, ex:
299 if "HelloWorld/Missing" not in str(ex):
300 raise
301 finally:
302 sys.stdout = old_out
304 def testRecipeFailure(self):
305 old_out = sys.stdout
306 try:
307 sys.stdout = StringIO()
308 self.child = server.handle_requests('*')
309 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
310 try:
311 download_and_execute(policy, [])
312 assert False
313 except download.DownloadError, ex:
314 if "Connection" not in str(ex):
315 raise
316 finally:
317 sys.stdout = old_out
319 def testMirrors(self):
320 old_out = sys.stdout
321 try:
322 sys.stdout = StringIO()
323 getLogger().setLevel(ERROR)
324 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
325 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
326 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
327 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
329 refreshed = policy.solve_with_downloads()
330 policy.handler.wait_for_blocker(refreshed)
331 assert policy.ready
332 finally:
333 sys.stdout = old_out
335 def testReplay(self):
336 old_out = sys.stdout
337 try:
338 sys.stdout = StringIO()
339 getLogger().setLevel(ERROR)
340 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
341 mtime = int(os.stat('Hello-new.xml').st_mtime)
342 self.config.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
344 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
345 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
346 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
347 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
349 # Update from mirror (should ignore out-of-date timestamp)
350 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
351 policy.handler.wait_for_blocker(refreshed)
353 # Update from upstream (should report an error)
354 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
355 try:
356 policy.handler.wait_for_blocker(refreshed)
357 raise Exception("Should have been rejected!")
358 except model.SafeException, ex:
359 assert "New feed's modification time is before old version" in str(ex)
361 # Must finish with the newest version
362 self.assertEquals(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
363 finally:
364 sys.stdout = old_out
366 def testBackground(self, verbose = False):
367 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
368 self.import_feed(p.root, 'Hello.xml')
369 p.freshness = 0
370 p.network_use = model.network_minimal
371 p.solver.solve(p.root, arch.get_host_architecture())
372 assert p.ready, p.solver.get_failure_reason()
374 @tasks.async
375 def choose_download(registed_cb, nid, actions):
376 try:
377 assert actions == ['download', 'Download'], actions
378 registed_cb(nid, 'download')
379 except:
380 import traceback
381 traceback.print_exc()
382 yield None
384 global ran_gui
385 ran_gui = False
386 old_out = sys.stdout
387 try:
388 sys.stdout = StringIO()
389 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
390 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
391 my_dbus.user_callback = choose_download
392 pid = os.getpid()
393 old_exit = os._exit
394 def my_exit(code):
395 # The background handler runs in the same process
396 # as the tests, so don't let it abort.
397 if os.getpid() == pid:
398 raise SystemExit(code)
399 # But, child download processes are OK
400 old_exit(code)
401 key_info = fetch.DEFAULT_KEY_LOOKUP_SERVER
402 fetch.DEFAULT_KEY_LOOKUP_SERVER = None
403 try:
404 try:
405 os._exit = my_exit
406 background.spawn_background_update(p, verbose)
407 assert False
408 except SystemExit, ex:
409 self.assertEquals(1, ex.code)
410 finally:
411 os._exit = old_exit
412 fetch.DEFAULT_KEY_LOOKUP_SERVER = key_info
413 finally:
414 sys.stdout = old_out
415 assert ran_gui
417 def testBackgroundVerbose(self):
418 self.testBackground(verbose = True)
420 if __name__ == '__main__':
421 unittest.main()