Tests pass without get_deprecated_singleton_config
[zeroinstall.git] / tests / testdownload.py
blob43e7d4a2db1247982cf80963a155800f58a8d414
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, gpg, iface_cache, download, trust, handler, background, arch, selections, qdom, run
15 from zeroinstall.injector.policy import Policy
16 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
24 import server
26 ran_gui = False
27 def raise_gui(*args):
28 global ran_gui
29 ran_gui = True
30 background._detach = lambda: False
31 background._exec_gui = raise_gui
33 @contextmanager
34 def output_suppressed():
35 old_stdout = sys.stdout
36 old_stderr = sys.stderr
37 try:
38 sys.stdout = StringIO()
39 sys.stderr = StringIO()
40 try:
41 yield
42 except Exception:
43 raise
44 except BaseException, ex:
45 # Don't abort unit-tests if someone raises SystemExit
46 raise Exception(str(type(ex)) + " " + str(ex))
47 finally:
48 sys.stdout = old_stdout
49 sys.stderr = old_stderr
51 class Reply:
52 def __init__(self, reply):
53 self.reply = reply
55 def readline(self):
56 return self.reply
58 def download_and_execute(policy, prog_args, main = None):
59 downloaded = policy.solve_and_download_impls()
60 if downloaded:
61 policy.config.handler.wait_for_blocker(downloaded)
62 run.execute_selections(policy.solver.selections, prog_args, stores = policy.config.stores, main = main)
64 class NetworkManager:
65 def state(self):
66 return 3 # NM_STATUS_CONNECTED
68 class TestDownload(BaseTest):
69 def setUp(self):
70 BaseTest.setUp(self)
72 self.config.fetcher = fetch.Fetcher(self.config.handler)
74 stream = tempfile.TemporaryFile()
75 stream.write(data.thomas_key)
76 stream.seek(0)
77 gpg.import_key(stream)
78 self.child = None
80 trust.trust_db.watchers = []
82 def tearDown(self):
83 BaseTest.tearDown(self)
84 if self.child is not None:
85 os.kill(self.child, signal.SIGTERM)
86 os.waitpid(self.child, 0)
87 self.child = None
89 def testRejectKey(self):
90 with output_suppressed():
91 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
92 policy = Policy('http://localhost:8000/Hello', config = self.config)
93 assert policy.need_download()
94 sys.stdin = Reply("N\n")
95 try:
96 download_and_execute(policy, ['Hello'])
97 assert 0
98 except model.SafeException, ex:
99 if "has no usable implementations" not in str(ex):
100 raise ex
101 if "Not signed with a trusted key" not in str(policy.handler.ex):
102 raise ex
103 self.config.handler.ex = None
105 def testRejectKeyXML(self):
106 with output_suppressed():
107 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
108 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
109 assert policy.need_download()
110 sys.stdin = Reply("N\n")
111 try:
112 download_and_execute(policy, ['Hello'])
113 assert 0
114 except model.SafeException, ex:
115 if "has no usable implementations" not in str(ex):
116 raise ex
117 if "Not signed with a trusted key" not in str(policy.handler.ex):
118 raise
119 self.config.handler.ex = None
121 def testImport(self):
122 from zeroinstall.injector import cli
124 rootLogger = getLogger()
125 rootLogger.disabled = True
126 try:
127 try:
128 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
129 assert 0
130 except model.SafeException, ex:
131 assert 'NO-SUCH-FILE' in str(ex)
132 finally:
133 rootLogger.disabled = False
134 rootLogger.setLevel(WARN)
136 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
137 self.assertEquals(None, hello)
139 with output_suppressed():
140 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
141 sys.stdin = Reply("Y\n")
143 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
144 cli.main(['--import', 'Hello'])
145 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
147 # Check we imported the interface after trusting the key
148 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
149 self.assertEquals(1, len(hello.implementations))
151 # Shouldn't need to prompt the second time
152 sys.stdin = None
153 cli.main(['--import', 'Hello'])
155 def testSelections(self):
156 from zeroinstall.injector import cli
157 root = qdom.parse(file("selections.xml"))
158 sels = selections.Selections(root)
159 class Options: dry_run = False
161 with output_suppressed():
162 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
163 sys.stdin = Reply("Y\n")
164 try:
165 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
166 assert False
167 except NotStored:
168 pass
169 cli.main(['--download-only', 'selections.xml'])
170 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
171 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
173 assert sels.download_missing(self.config) is None
175 def testHelpers(self):
176 from zeroinstall import helpers
178 with output_suppressed():
179 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
180 sys.stdin = Reply("Y\n")
181 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml')
182 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
183 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
184 assert sels.download_missing(self.config) is None
186 def testSelectionsWithFeed(self):
187 from zeroinstall.injector import cli
188 root = qdom.parse(file("selections.xml"))
189 sels = selections.Selections(root)
191 with output_suppressed():
192 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
193 sys.stdin = Reply("Y\n")
195 self.config.handler.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
197 cli.main(['--download-only', 'selections.xml'], config = self.config)
198 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
199 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
201 assert sels.download_missing(self.config) is None
203 def testAcceptKey(self):
204 with output_suppressed():
205 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
206 policy = Policy('http://localhost:8000/Hello', config = self.config)
207 assert policy.need_download()
208 sys.stdin = Reply("Y\n")
209 try:
210 download_and_execute(policy, ['Hello'], main = 'Missing')
211 assert 0
212 except model.SafeException, ex:
213 if "HelloWorld/Missing" not in str(ex):
214 raise
216 def testDistro(self):
217 with output_suppressed():
218 native_url = 'http://example.com:8000/Native.xml'
220 # Initially, we don't have the feed at all...
221 master_feed = self.config.iface_cache.get_feed(native_url)
222 assert master_feed is None, master_feed
224 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
225 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
226 policy = Policy(native_url, config = self.config)
227 assert policy.need_download()
229 solve = policy.solve_with_downloads()
230 self.config.handler.wait_for_blocker(solve)
231 tasks.check(solve)
233 master_feed = self.config.iface_cache.get_feed(native_url)
234 assert master_feed is not None
235 assert master_feed.implementations == {}
237 distro_feed_url = master_feed.get_distro_feed()
238 assert distro_feed_url is not None
239 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
240 assert distro_feed is not None
241 assert len(distro_feed.implementations) == 2, distro_feed.implementations
243 def testWrongSize(self):
244 with output_suppressed():
245 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
246 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
247 policy = Policy('http://localhost:8000/Hello-wrong-size', config = self.config)
248 assert policy.need_download()
249 sys.stdin = Reply("Y\n")
250 try:
251 download_and_execute(policy, ['Hello'], main = 'Missing')
252 assert 0
253 except model.SafeException, ex:
254 if "Downloaded archive has incorrect size" not in str(ex):
255 raise ex
257 def testRecipe(self):
258 old_out = sys.stdout
259 try:
260 sys.stdout = StringIO()
261 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
262 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
263 try:
264 download_and_execute(policy, [])
265 assert False
266 except model.SafeException, ex:
267 if "HelloWorld/Missing" not in str(ex):
268 raise ex
269 finally:
270 sys.stdout = old_out
272 def testSymlink(self):
273 old_out = sys.stdout
274 try:
275 sys.stdout = StringIO()
276 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
277 policy = Policy(os.path.abspath('RecipeSymlink.xml'), config = self.config)
278 try:
279 download_and_execute(policy, [])
280 assert False
281 except model.SafeException, ex:
282 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
283 raise
284 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
285 finally:
286 sys.stdout = old_out
288 def testAutopackage(self):
289 old_out = sys.stdout
290 try:
291 sys.stdout = StringIO()
292 self.child = server.handle_requests('HelloWorld.autopackage')
293 policy = Policy(os.path.abspath('Autopackage.xml'), config = self.config)
294 try:
295 download_and_execute(policy, [])
296 assert False
297 except model.SafeException, ex:
298 if "HelloWorld/Missing" not in str(ex):
299 raise
300 finally:
301 sys.stdout = old_out
303 def testRecipeFailure(self):
304 old_out = sys.stdout
305 try:
306 sys.stdout = StringIO()
307 self.child = server.handle_requests('*')
308 policy = Policy(os.path.abspath('Recipe.xml'), config = self.config)
309 try:
310 download_and_execute(policy, [])
311 assert False
312 except download.DownloadError, ex:
313 if "Connection" not in str(ex):
314 raise
315 finally:
316 sys.stdout = old_out
318 def testMirrors(self):
319 old_out = sys.stdout
320 try:
321 sys.stdout = StringIO()
322 getLogger().setLevel(ERROR)
323 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
324 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
325 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
326 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
328 refreshed = policy.solve_with_downloads()
329 policy.handler.wait_for_blocker(refreshed)
330 assert policy.ready
331 finally:
332 sys.stdout = old_out
334 def testReplay(self):
335 old_out = sys.stdout
336 try:
337 sys.stdout = StringIO()
338 getLogger().setLevel(ERROR)
339 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
340 mtime = int(os.stat('Hello-new.xml').st_mtime)
341 self.config.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
343 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
344 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
345 policy = Policy('http://example.com:8000/Hello.xml', config = self.config)
346 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
348 # Update from mirror (should ignore out-of-date timestamp)
349 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
350 policy.handler.wait_for_blocker(refreshed)
352 # Update from upstream (should report an error)
353 refreshed = policy.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
354 try:
355 policy.handler.wait_for_blocker(refreshed)
356 raise Exception("Should have been rejected!")
357 except model.SafeException, ex:
358 assert "New feed's modification time is before old version" in str(ex)
360 # Must finish with the newest version
361 self.assertEquals(1235911552, self.config.iface_cache._get_signature_date(iface.uri))
362 finally:
363 sys.stdout = old_out
365 def testBackground(self, verbose = False):
366 p = Policy('http://example.com:8000/Hello.xml', config = self.config)
367 self.import_feed(p.root, 'Hello.xml')
368 p.freshness = 0
369 p.network_use = model.network_minimal
370 p.solver.solve(p.root, arch.get_host_architecture())
371 assert p.ready, p.solver.get_failure_reason()
373 @tasks.async
374 def choose_download(registed_cb, nid, actions):
375 try:
376 assert actions == ['download', 'Download'], actions
377 registed_cb(nid, 'download')
378 except:
379 import traceback
380 traceback.print_exc()
381 yield None
383 global ran_gui
384 ran_gui = False
385 old_out = sys.stdout
386 try:
387 sys.stdout = StringIO()
388 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
389 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
390 my_dbus.user_callback = choose_download
391 pid = os.getpid()
392 old_exit = os._exit
393 def my_exit(code):
394 # The background handler runs in the same process
395 # as the tests, so don't let it abort.
396 if os.getpid() == pid:
397 raise SystemExit(code)
398 # But, child download processes are OK
399 old_exit(code)
400 key_info = fetch.DEFAULT_KEY_LOOKUP_SERVER
401 fetch.DEFAULT_KEY_LOOKUP_SERVER = None
402 try:
403 try:
404 os._exit = my_exit
405 background.spawn_background_update(p, verbose)
406 assert False
407 except SystemExit, ex:
408 self.assertEquals(1, ex.code)
409 finally:
410 os._exit = old_exit
411 fetch.DEFAULT_KEY_LOOKUP_SERVER = key_info
412 finally:
413 sys.stdout = old_out
414 assert ran_gui
416 def testBackgroundVerbose(self):
417 self.testBackground(verbose = True)
419 if __name__ == '__main__':
420 unittest.main()