Release 1.0
[zeroinstall/solver.git] / tests / testdownload.py
blobcdb72aaa67fc059e9eab713ce58c9cb1abc9097e
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 import server
24 ran_gui = False
25 def raise_gui(*args):
26 global ran_gui
27 ran_gui = True
28 background._detach = lambda: False
29 background._exec_gui = raise_gui
31 @contextmanager
32 def output_suppressed():
33 old_stdout = sys.stdout
34 old_stderr = sys.stderr
35 try:
36 sys.stdout = StringIO()
37 sys.stderr = StringIO()
38 try:
39 yield
40 except Exception:
41 raise
42 except BaseException, ex:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex)) + " " + str(ex))
45 finally:
46 sys.stdout = old_stdout
47 sys.stderr = old_stderr
49 class Reply:
50 def __init__(self, reply):
51 self.reply = reply
53 def readline(self):
54 return self.reply
56 def download_and_execute(policy, prog_args, main = None):
57 downloaded = policy.solve_and_download_impls()
58 if downloaded:
59 policy.config.handler.wait_for_blocker(downloaded)
60 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
62 class NetworkManager:
63 def state(self):
64 return 3 # NM_STATUS_CONNECTED
66 class TestDownload(BaseTest):
67 def setUp(self):
68 BaseTest.setUp(self)
70 self.config.handler.allow_downloads = True
71 self.config.key_info_server = 'http://localhost:3333/key-info'
73 self.config.fetcher = fetch.Fetcher(self.config)
75 stream = tempfile.TemporaryFile()
76 stream.write(data.thomas_key)
77 stream.seek(0)
78 gpg.import_key(stream)
79 self.child = None
81 trust.trust_db.watchers = []
83 def tearDown(self):
84 BaseTest.tearDown(self)
85 if self.child is not None:
86 os.kill(self.child, signal.SIGTERM)
87 os.waitpid(self.child, 0)
88 self.child = None
90 def testRejectKey(self):
91 with output_suppressed():
92 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
93 policy = Policy('http://localhost:8000/Hello', config = self.config)
94 assert policy.need_download()
95 sys.stdin = Reply("N\n")
96 try:
97 download_and_execute(policy, ['Hello'])
98 assert 0
99 except model.SafeException, ex:
100 if "has no usable implementations" not in str(ex):
101 raise ex
102 if "Not signed with a trusted key" not in str(policy.handler.ex):
103 raise ex
104 self.config.handler.ex = None
106 def testRejectKeyXML(self):
107 with output_suppressed():
108 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
109 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
110 assert policy.need_download()
111 sys.stdin = Reply("N\n")
112 try:
113 download_and_execute(policy, ['Hello'])
114 assert 0
115 except model.SafeException, ex:
116 if "has no usable implementations" not in str(ex):
117 raise ex
118 if "Not signed with a trusted key" not in str(policy.handler.ex):
119 raise
120 self.config.handler.ex = None
122 def testImport(self):
123 from zeroinstall.injector import cli
125 rootLogger = getLogger()
126 rootLogger.disabled = True
127 try:
128 try:
129 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
130 assert 0
131 except model.SafeException, ex:
132 assert 'NO-SUCH-FILE' in str(ex)
133 finally:
134 rootLogger.disabled = False
135 rootLogger.setLevel(WARN)
137 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
138 self.assertEquals(None, hello)
140 with output_suppressed():
141 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
142 sys.stdin = Reply("Y\n")
144 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
145 cli.main(['--import', 'Hello'], config = self.config)
146 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
148 # Check we imported the interface after trusting the key
149 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
150 self.assertEquals(1, len(hello.implementations))
152 # Shouldn't need to prompt the second time
153 sys.stdin = None
154 cli.main(['--import', 'Hello'], config = self.config)
156 def testSelections(self):
157 from zeroinstall.injector import cli
158 root = qdom.parse(file("selections.xml"))
159 sels = selections.Selections(root)
160 class Options: dry_run = False
162 with output_suppressed():
163 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
164 sys.stdin = Reply("Y\n")
165 try:
166 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
167 assert False
168 except NotStored:
169 pass
170 cli.main(['--download-only', 'selections.xml'], config = self.config)
171 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
172 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
174 assert sels.download_missing(self.config) is None
176 def testHelpers(self):
177 from zeroinstall import helpers
179 with output_suppressed():
180 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
181 sys.stdin = Reply("Y\n")
182 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
183 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
184 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
185 assert sels.download_missing(self.config) is None
187 def testSelectionsWithFeed(self):
188 from zeroinstall.injector import cli
189 root = qdom.parse(file("selections.xml"))
190 sels = selections.Selections(root)
192 with output_suppressed():
193 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
194 sys.stdin = Reply("Y\n")
196 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
198 cli.main(['--download-only', 'selections.xml'], config = self.config)
199 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
200 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
202 assert sels.download_missing(self.config) is None
204 def testAcceptKey(self):
205 with output_suppressed():
206 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
207 policy = Policy('http://localhost:8000/Hello', config = self.config)
208 assert policy.need_download()
209 sys.stdin = Reply("Y\n")
210 try:
211 download_and_execute(policy, ['Hello'], main = 'Missing')
212 assert 0
213 except model.SafeException, ex:
214 if "HelloWorld/Missing" not in str(ex):
215 raise
217 def testAutoAcceptKey(self):
218 self.config.auto_approve_keys = True
219 with output_suppressed():
220 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
221 policy = Policy('http://localhost:8000/Hello', config = self.config)
222 assert policy.need_download()
223 sys.stdin = Reply("")
224 try:
225 download_and_execute(policy, ['Hello'], main = 'Missing')
226 assert 0
227 except model.SafeException, ex:
228 if "HelloWorld/Missing" not in str(ex):
229 raise
231 def testDistro(self):
232 with output_suppressed():
233 native_url = 'http://example.com:8000/Native.xml'
235 # Initially, we don't have the feed at all...
236 master_feed = self.config.iface_cache.get_feed(native_url)
237 assert master_feed is None, master_feed
239 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
240 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
241 policy = Policy(native_url, config = self.config)
242 assert policy.need_download()
244 solve = policy.solve_with_downloads()
245 self.config.handler.wait_for_blocker(solve)
246 tasks.check(solve)
248 master_feed = self.config.iface_cache.get_feed(native_url)
249 assert master_feed is not None
250 assert master_feed.implementations == {}
252 distro_feed_url = master_feed.get_distro_feed()
253 assert distro_feed_url is not None
254 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
255 assert distro_feed is not None
256 assert len(distro_feed.implementations) == 2, distro_feed.implementations
258 def testWrongSize(self):
259 with output_suppressed():
260 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
261 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
262 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
263 assert policy.need_download()
264 sys.stdin = Reply("Y\n")
265 try:
266 download_and_execute(policy, ['Hello'], main = 'Missing')
267 assert 0
268 except model.SafeException, ex:
269 if "Downloaded archive has incorrect size" not in str(ex):
270 raise ex
272 def testRecipe(self):
273 old_out = sys.stdout
274 try:
275 sys.stdout = StringIO()
276 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
277 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
278 try:
279 download_and_execute(policy, [])
280 assert False
281 except model.SafeException, ex:
282 if "HelloWorld/Missing" not in str(ex):
283 raise ex
284 finally:
285 sys.stdout = old_out
287 def testSymlink(self):
288 old_out = sys.stdout
289 try:
290 sys.stdout = StringIO()
291 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
292 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
293 try:
294 download_and_execute(policy, [])
295 assert False
296 except model.SafeException, ex:
297 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
298 raise
299 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
300 finally:
301 sys.stdout = old_out
303 def testAutopackage(self):
304 old_out = sys.stdout
305 try:
306 sys.stdout = StringIO()
307 self.child = server.handle_requests('HelloWorld.autopackage')
308 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
309 try:
310 download_and_execute(policy, [])
311 assert False
312 except model.SafeException, ex:
313 if "HelloWorld/Missing" not in str(ex):
314 raise
315 finally:
316 sys.stdout = old_out
318 def testRecipeFailure(self):
319 old_out = sys.stdout
320 try:
321 sys.stdout = StringIO()
322 self.child = server.handle_requests('*')
323 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
324 try:
325 download_and_execute(policy, [])
326 assert False
327 except download.DownloadError, ex:
328 if "Connection" not in str(ex):
329 raise
330 finally:
331 sys.stdout = old_out
333 def testMirrors(self):
334 old_out = sys.stdout
335 try:
336 sys.stdout = StringIO()
337 getLogger().setLevel(ERROR)
338 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
339 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
340 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
341 self.config.feed_mirror = 'http://example.com:8000/0mirror'
343 refreshed = policy.solve_with_downloads()
344 policy.handler.wait_for_blocker(refreshed)
345 assert policy.ready
346 finally:
347 sys.stdout = old_out
349 def testReplay(self):
350 old_out = sys.stdout
351 try:
352 sys.stdout = StringIO()
353 getLogger().setLevel(ERROR)
354 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
355 mtime = int(os.stat('Hello-new.xml').st_mtime)
356 self.config.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
358 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
359 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
360 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
361 self.config.feed_mirror = 'http://example.com:8000/0mirror'
363 # Update from mirror (should ignore out-of-date timestamp)
364 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
365 policy.handler.wait_for_blocker(refreshed)
367 # Update from upstream (should report an error)
368 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
369 try:
370 policy.handler.wait_for_blocker(refreshed)
371 raise Exception("Should have been rejected!")
372 except model.SafeException, ex:
373 assert "New feed's modification time is before old version" in str(ex)
375 # Must finish with the newest version
376 self.assertEquals(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
377 finally:
378 sys.stdout = old_out
380 def testBackground(self, verbose = False):
381 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
382 self.import_feed(p.root, 'Hello.xml')
383 p.freshness = 0
384 p.network_use = model.network_minimal
385 p.solver.solve(p.root, arch.get_host_architecture())
386 assert p.ready, p.solver.get_failure_reason()
388 @tasks.async
389 def choose_download(registed_cb, nid, actions):
390 try:
391 assert actions == ['download', 'Download'], actions
392 registed_cb(nid, 'download')
393 except:
394 import traceback
395 traceback.print_exc()
396 yield None
398 global ran_gui
399 ran_gui = False
400 old_out = sys.stdout
401 try:
402 sys.stdout = StringIO()
403 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
404 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
405 my_dbus.user_callback = choose_download
406 pid = os.getpid()
407 old_exit = os._exit
408 def my_exit(code):
409 # The background handler runs in the same process
410 # as the tests, so don't let it abort.
411 if os.getpid() == pid:
412 raise SystemExit(code)
413 # But, child download processes are OK
414 old_exit(code)
415 from zeroinstall.injector import config
416 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
417 config.DEFAULT_KEY_LOOKUP_SERVER = None
418 try:
419 try:
420 os._exit = my_exit
421 background.spawn_background_update(p, verbose)
422 assert False
423 except SystemExit, ex:
424 self.assertEquals(1, ex.code)
425 finally:
426 os._exit = old_exit
427 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
428 finally:
429 sys.stdout = old_out
430 assert ran_gui
432 def testBackgroundVerbose(self):
433 self.testBackground(verbose = True)
435 if __name__ == '__main__':
436 unittest.main()