Added ability to generate implementation ids on feed creation
[zeroinstall/zeroinstall-limyreth.git] / tests / testdownload.py
blob22b9bc83ee883a8c23d647a8604e259b1485e4f6
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 import server
24 ran_gui = False
25 def raise_gui(*args):
26 global ran_gui
27 ran_gui = True
28 background._detach = lambda: False
29 background._exec_gui = raise_gui
31 @contextmanager
32 def output_suppressed():
33 old_stdout = sys.stdout
34 old_stderr = sys.stderr
35 try:
36 sys.stdout = StringIO()
37 sys.stderr = StringIO()
38 try:
39 yield
40 except Exception:
41 raise
42 except BaseException, ex:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex)) + " " + str(ex))
45 finally:
46 sys.stdout = old_stdout
47 sys.stderr = old_stderr
49 class Reply:
50 def __init__(self, reply):
51 self.reply = reply
53 def readline(self):
54 return self.reply
56 def download_and_execute(policy, prog_args, main = None):
57 downloaded = policy.solve_and_download_impls()
58 if downloaded:
59 policy.config.handler.wait_for_blocker(downloaded)
60 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
62 class NetworkManager:
63 def state(self):
64 return 3 # NM_STATUS_CONNECTED
66 class TestDownload(BaseTest):
67 def setUp(self):
68 BaseTest.setUp(self)
70 self.config.handler.allow_downloads = True
71 self.config.key_info_server = 'http://localhost:3333/key-info'
73 self.config.fetcher = fetch.Fetcher(self.config)
75 stream = tempfile.TemporaryFile()
76 stream.write(data.thomas_key)
77 stream.seek(0)
78 gpg.import_key(stream)
79 self.child = None
81 trust.trust_db.watchers = []
83 def tearDown(self):
84 BaseTest.tearDown(self)
85 if self.child is not None:
86 os.kill(self.child, signal.SIGTERM)
87 os.waitpid(self.child, 0)
88 self.child = None
90 def testRejectKey(self):
91 with output_suppressed():
92 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
93 policy = Policy('http://localhost:8000/Hello', config = self.config)
94 assert policy.need_download()
95 sys.stdin = Reply("N\n")
96 try:
97 download_and_execute(policy, ['Hello'])
98 assert 0
99 except model.SafeException, ex:
100 if "has no usable implementations" not in str(ex):
101 raise ex
102 if "Not signed with a trusted key" not in str(policy.handler.ex):
103 raise ex
104 self.config.handler.ex = None
106 def testRejectKeyXML(self):
107 with output_suppressed():
108 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
109 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
110 assert policy.need_download()
111 sys.stdin = Reply("N\n")
112 try:
113 download_and_execute(policy, ['Hello'])
114 assert 0
115 except model.SafeException, ex:
116 if "has no usable implementations" not in str(ex):
117 raise ex
118 if "Not signed with a trusted key" not in str(policy.handler.ex):
119 raise
120 self.config.handler.ex = None
122 def testImport(self):
123 from zeroinstall.injector import cli
125 rootLogger = getLogger()
126 rootLogger.disabled = True
127 try:
128 try:
129 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
130 assert 0
131 except model.SafeException, ex:
132 assert 'NO-SUCH-FILE' in str(ex)
133 finally:
134 rootLogger.disabled = False
135 rootLogger.setLevel(WARN)
137 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
138 self.assertEquals(None, hello)
140 with output_suppressed():
141 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
142 sys.stdin = Reply("Y\n")
144 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
145 cli.main(['--import', 'Hello'], config = self.config)
146 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
148 # Check we imported the interface after trusting the key
149 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
150 self.assertEquals(1, len(hello.implementations))
152 # Shouldn't need to prompt the second time
153 sys.stdin = None
154 cli.main(['--import', 'Hello'], config = self.config)
156 def testSelections(self):
157 from zeroinstall.injector import cli
158 root = qdom.parse(file("selections.xml"))
159 sels = selections.Selections(root)
160 class Options: dry_run = False
162 with output_suppressed():
163 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
164 sys.stdin = Reply("Y\n")
165 try:
166 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
167 assert False
168 except NotStored:
169 pass
170 cli.main(['--download-only', 'selections.xml'], config = self.config)
171 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
172 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
174 assert sels.download_missing(self.config) is None
176 def testHelpers(self):
177 from zeroinstall import helpers
179 with output_suppressed():
180 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
181 sys.stdin = Reply("Y\n")
182 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
183 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
184 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
185 assert sels.download_missing(self.config) is None
187 def testSelectionsWithFeed(self):
188 from zeroinstall.injector import cli
189 root = qdom.parse(file("selections.xml"))
190 sels = selections.Selections(root)
192 with output_suppressed():
193 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
194 sys.stdin = Reply("Y\n")
196 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
198 cli.main(['--download-only', 'selections.xml'], config = self.config)
199 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
200 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
202 assert sels.download_missing(self.config) is None
204 def testAcceptKey(self):
205 with output_suppressed():
206 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
207 policy = Policy('http://localhost:8000/Hello', config = self.config)
208 assert policy.need_download()
209 sys.stdin = Reply("Y\n")
210 try:
211 download_and_execute(policy, ['Hello'], main = 'Missing')
212 assert 0
213 except model.SafeException, ex:
214 if "HelloWorld/Missing" not in str(ex):
215 raise
217 def testAutoAcceptKey(self):
218 self.config.auto_approve_keys = True
219 with output_suppressed():
220 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
221 policy = Policy('http://localhost:8000/Hello', config = self.config)
222 assert policy.need_download()
223 sys.stdin = Reply("")
224 try:
225 download_and_execute(policy, ['Hello'], main = 'Missing')
226 assert 0
227 except model.SafeException, ex:
228 if "HelloWorld/Missing" not in str(ex):
229 raise
231 def testDistro(self):
232 with output_suppressed():
233 native_url = 'http://example.com:8000/Native.xml'
235 # Initially, we don't have the feed at all...
236 master_feed = self.config.iface_cache.get_feed(native_url)
237 assert master_feed is None, master_feed
239 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
240 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
241 policy = Policy(native_url, config = self.config)
242 assert policy.need_download()
244 solve = policy.solve_with_downloads()
245 self.config.handler.wait_for_blocker(solve)
246 tasks.check(solve)
248 master_feed = self.config.iface_cache.get_feed(native_url)
249 assert master_feed is not None
250 assert master_feed.implementations == {}
252 distro_feed_url = master_feed.get_distro_feed()
253 assert distro_feed_url is not None
254 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
255 assert distro_feed is not None
256 assert len(distro_feed.implementations) == 2, distro_feed.implementations
258 def testWrongSize(self):
259 with output_suppressed():
260 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
261 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
262 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
263 assert policy.need_download()
264 sys.stdin = Reply("Y\n")
265 try:
266 download_and_execute(policy, ['Hello'], main = 'Missing')
267 assert 0
268 except model.SafeException, ex:
269 if "Downloaded archive has incorrect size" not in str(ex):
270 raise ex
272 def testImplementationGenerateMissingId(self):
273 old_out = sys.stdout
274 try:
275 sys.stdout = StringIO()
276 self.child = server.handle_requests(('HelloWorld.tgz'))
278 from zeroinstall.injector import qdom
279 root = qdom.parse(file(os.path.abspath('ImplementationNoId.xml')))
281 from zeroinstall.zerostore import manifest
282 alg = manifest.get_algorithm('sha1')
283 assert alg
285 from zeroinstall.injector.model import ZeroInstallFeed
286 feed = ZeroInstallFeed(root, None, None, alg, self.config.fetcher, self.config.stores)
288 expected_id = 'sha1=3ce644dc725f1d21cfcf02562c76f375944b266a'
289 assert feed.implementations[expected_id]
290 assert feed.implementations[expected_id].id == expected_id
291 finally:
292 sys.stdout = old_out
294 def testRecipe(self):
295 old_out = sys.stdout
296 try:
297 sys.stdout = StringIO()
298 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
299 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
300 try:
301 download_and_execute(policy, [])
302 assert False
303 except model.SafeException, ex:
304 if "HelloWorld/Missing" not in str(ex):
305 raise ex
306 finally:
307 sys.stdout = old_out
309 def testRecipeUnpack(self):
310 old_out = sys.stdout
311 try:
312 sys.stdout = StringIO()
313 self.child = server.handle_requests(('doubly_packed.tar'))
314 policy = Policy(os.path.abspath('Unpack.xml'), config = self.config)
315 try:
316 download_and_execute(policy, [])
317 assert False
318 except model.SafeException, ex:
319 if "HelloWorld/Missing" not in str(ex):
320 raise ex
321 finally:
322 sys.stdout = old_out
324 def testSymlink(self):
325 old_out = sys.stdout
326 try:
327 sys.stdout = StringIO()
328 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
329 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
330 try:
331 download_and_execute(policy, [])
332 assert False
333 except model.SafeException, ex:
334 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
335 raise
336 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
337 finally:
338 sys.stdout = old_out
340 def testAutopackage(self):
341 old_out = sys.stdout
342 try:
343 sys.stdout = StringIO()
344 self.child = server.handle_requests('HelloWorld.autopackage')
345 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
346 try:
347 download_and_execute(policy, [])
348 assert False
349 except model.SafeException, ex:
350 if "HelloWorld/Missing" not in str(ex):
351 raise
352 finally:
353 sys.stdout = old_out
355 def testRecipeFailure(self):
356 old_out = sys.stdout
357 try:
358 sys.stdout = StringIO()
359 self.child = server.handle_requests('*')
360 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
361 try:
362 download_and_execute(policy, [])
363 assert False
364 except download.DownloadError, ex:
365 if "Connection" not in str(ex):
366 raise
367 finally:
368 sys.stdout = old_out
370 def testMirrors(self):
371 old_out = sys.stdout
372 try:
373 sys.stdout = StringIO()
374 getLogger().setLevel(ERROR)
375 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
376 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
377 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
378 self.config.feed_mirror = 'http://example.com:8000/0mirror'
380 refreshed = policy.solve_with_downloads()
381 policy.handler.wait_for_blocker(refreshed)
382 assert policy.ready
383 finally:
384 sys.stdout = old_out
386 def testReplay(self):
387 old_out = sys.stdout
388 try:
389 sys.stdout = StringIO()
390 getLogger().setLevel(ERROR)
391 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
392 mtime = int(os.stat('Hello-new.xml').st_mtime)
393 self.config.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
395 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
396 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
397 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
398 self.config.feed_mirror = 'http://example.com:8000/0mirror'
400 # Update from mirror (should ignore out-of-date timestamp)
401 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
402 policy.handler.wait_for_blocker(refreshed)
404 # Update from upstream (should report an error)
405 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
406 try:
407 policy.handler.wait_for_blocker(refreshed)
408 raise Exception("Should have been rejected!")
409 except model.SafeException, ex:
410 assert "New feed's modification time is before old version" in str(ex)
412 # Must finish with the newest version
413 self.assertEquals(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
414 finally:
415 sys.stdout = old_out
417 def testBackground(self, verbose = False):
418 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
419 self.import_feed(p.root, 'Hello.xml')
420 p.freshness = 0
421 p.network_use = model.network_minimal
422 p.solver.solve(p.root, arch.get_host_architecture())
423 assert p.ready, p.solver.get_failure_reason()
425 @tasks.async
426 def choose_download(registed_cb, nid, actions):
427 try:
428 assert actions == ['download', 'Download'], actions
429 registed_cb(nid, 'download')
430 except:
431 import traceback
432 traceback.print_exc()
433 yield None
435 global ran_gui
436 ran_gui = False
437 old_out = sys.stdout
438 try:
439 sys.stdout = StringIO()
440 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
441 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
442 my_dbus.user_callback = choose_download
443 pid = os.getpid()
444 old_exit = os._exit
445 def my_exit(code):
446 # The background handler runs in the same process
447 # as the tests, so don't let it abort.
448 if os.getpid() == pid:
449 raise SystemExit(code)
450 # But, child download processes are OK
451 old_exit(code)
452 from zeroinstall.injector import config
453 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
454 config.DEFAULT_KEY_LOOKUP_SERVER = None
455 try:
456 try:
457 os._exit = my_exit
458 background.spawn_background_update(p, verbose)
459 assert False
460 except SystemExit, ex:
461 self.assertEquals(1, ex.code)
462 finally:
463 os._exit = old_exit
464 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
465 finally:
466 sys.stdout = old_out
467 assert ran_gui
469 def testBackgroundVerbose(self):
470 self.testBackground(verbose = True)
472 if __name__ == '__main__':
473 unittest.main()