When spawning a download subprocess, add zeroinstall to sys.path
[zeroinstall.git] / tests / testdownload.py
blob012819a98d565f316a69b4bc3ff53ed24d436dc6
1 #!/usr/bin/env python2.5
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
15 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
16 from zeroinstall.support import basedir, tasks
17 from zeroinstall.injector import fetch
18 import data
19 import my_dbus
21 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
23 import server
25 ran_gui = False
26 def raise_gui(*args):
27 global ran_gui
28 ran_gui = True
29 background._detach = lambda: False
30 background._exec_gui = raise_gui
31 sys.modules['dbus'] = my_dbus
32 sys.modules['dbus.glib'] = my_dbus
33 my_dbus.types = my_dbus
34 sys.modules['dbus.types'] = my_dbus
36 @contextmanager
37 def output_suppressed():
38 old_stdout = sys.stdout
39 old_stderr = sys.stderr
40 try:
41 sys.stdout = StringIO()
42 sys.stderr = StringIO()
43 yield
44 finally:
45 sys.stdout = old_stdout
46 sys.stderr = old_stderr
48 class Reply:
49 def __init__(self, reply):
50 self.reply = reply
52 def readline(self):
53 return self.reply
55 class DummyHandler(handler.Handler):
56 __slots__ = ['ex', 'tb']
58 def __init__(self):
59 handler.Handler.__init__(self)
60 self.ex = None
62 def wait_for_blocker(self, blocker):
63 self.ex = None
64 handler.Handler.wait_for_blocker(self, blocker)
65 if self.ex:
66 raise self.ex, None, self.tb
68 def report_error(self, ex, tb = None):
69 assert self.ex is None, self.ex
70 self.ex = ex
71 self.tb = tb
73 #import traceback
74 #traceback.print_exc()
76 class TestDownload(BaseTest):
77 def setUp(self):
78 BaseTest.setUp(self)
80 stream = tempfile.TemporaryFile()
81 stream.write(data.thomas_key)
82 stream.seek(0)
83 gpg.import_key(stream)
84 self.child = None
86 trust.trust_db.watchers = []
88 def tearDown(self):
89 BaseTest.tearDown(self)
90 if self.child is not None:
91 os.kill(self.child, signal.SIGTERM)
92 os.waitpid(self.child, 0)
93 self.child = None
95 def testRejectKey(self):
96 with output_suppressed():
97 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
98 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
99 handler = DummyHandler())
100 assert policy.need_download()
101 sys.stdin = Reply("N\n")
102 try:
103 policy.download_and_execute(['Hello'])
104 assert 0
105 except model.SafeException, ex:
106 if "Can't find all required implementations" not in str(ex):
107 raise ex
108 if "Not signed with a trusted key" not in str(policy.handler.ex):
109 raise ex
111 def testRejectKeyXML(self):
112 with output_suppressed():
113 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
114 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
115 handler = DummyHandler())
116 assert policy.need_download()
117 sys.stdin = Reply("N\n")
118 try:
119 policy.download_and_execute(['Hello'])
120 assert 0
121 except model.SafeException, ex:
122 if "Can't find all required implementations" not in str(ex):
123 raise ex
124 if "Not signed with a trusted key" not in str(policy.handler.ex):
125 raise
127 def testImport(self):
128 from zeroinstall.injector import cli
130 rootLogger = getLogger()
131 rootLogger.disabled = True
132 try:
133 try:
134 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
135 assert 0
136 except model.SafeException, ex:
137 assert 'NO-SUCH-FILE' in str(ex)
138 finally:
139 rootLogger.disabled = False
140 rootLogger.setLevel(WARN)
142 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
143 self.assertEquals(0, len(hello.implementations))
145 with output_suppressed():
146 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
147 sys.stdin = Reply("Y\n")
149 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
150 cli.main(['--import', 'Hello'])
151 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
153 # Check we imported the interface after trusting the key
154 reader.update_from_cache(hello)
155 self.assertEquals(1, len(hello.implementations))
157 # Shouldn't need to prompt the second time
158 sys.stdin = None
159 cli.main(['--import', 'Hello'])
161 def testSelections(self):
162 from zeroinstall.injector.cli import _download_missing_selections
163 root = qdom.parse(file("selections.xml"))
164 sels = selections.Selections(root)
165 class Options: dry_run = False
167 with output_suppressed():
168 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
169 sys.stdin = Reply("Y\n")
170 _download_missing_selections(Options(), sels)
171 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
172 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
174 assert sels.download_missing(iface_cache.iface_cache, None) is None
176 def testSelectionsWithFeed(self):
177 from zeroinstall.injector.cli import _download_missing_selections
178 root = qdom.parse(file("selections.xml"))
179 sels = selections.Selections(root)
180 class Options: dry_run = False
182 with output_suppressed():
183 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
184 sys.stdin = Reply("Y\n")
186 from zeroinstall.injector.handler import Handler
187 handler = Handler()
188 fetcher = fetch.Fetcher(handler)
189 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
191 _download_missing_selections(Options(), sels)
192 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
193 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
195 assert sels.download_missing(iface_cache.iface_cache, None) is None
197 def testAcceptKey(self):
198 with output_suppressed():
199 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
200 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
201 handler = DummyHandler())
202 assert policy.need_download()
203 sys.stdin = Reply("Y\n")
204 try:
205 policy.download_and_execute(['Hello'], main = 'Missing')
206 assert 0
207 except model.SafeException, ex:
208 if "HelloWorld/Missing" not in str(ex):
209 raise ex
211 def testRecipe(self):
212 old_out = sys.stdout
213 try:
214 sys.stdout = StringIO()
215 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
216 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
217 try:
218 policy.download_and_execute([])
219 assert False
220 except model.SafeException, ex:
221 if "HelloWorld/Missing" not in str(ex):
222 raise ex
223 finally:
224 sys.stdout = old_out
226 def testSymlink(self):
227 old_out = sys.stdout
228 try:
229 sys.stdout = StringIO()
230 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
231 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
232 handler = DummyHandler())
233 try:
234 policy.download_and_execute([])
235 assert False
236 except model.SafeException, ex:
237 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
238 raise ex
239 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
240 finally:
241 sys.stdout = old_out
243 def testAutopackage(self):
244 old_out = sys.stdout
245 try:
246 sys.stdout = StringIO()
247 self.child = server.handle_requests('HelloWorld.autopackage')
248 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
249 try:
250 policy.download_and_execute([])
251 assert False
252 except model.SafeException, ex:
253 if "HelloWorld/Missing" not in str(ex):
254 raise
255 finally:
256 sys.stdout = old_out
258 def testRecipeFailure(self):
259 old_out = sys.stdout
260 try:
261 sys.stdout = StringIO()
262 self.child = server.handle_requests('*')
263 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
264 handler = DummyHandler())
265 try:
266 policy.download_and_execute([])
267 assert False
268 except download.DownloadError, ex:
269 if "Connection" not in str(ex):
270 raise
271 finally:
272 sys.stdout = old_out
274 def testMirrors(self):
275 old_out = sys.stdout
276 try:
277 sys.stdout = StringIO()
278 getLogger().setLevel(ERROR)
279 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
280 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
281 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
282 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
284 refreshed = policy.solve_with_downloads()
285 policy.handler.wait_for_blocker(refreshed)
286 assert policy.ready
287 finally:
288 sys.stdout = old_out
290 def testReplay(self):
291 old_out = sys.stdout
292 try:
293 sys.stdout = StringIO()
294 getLogger().setLevel(ERROR)
295 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
296 mtime = int(os.stat('Hello-new.xml').st_mtime)
297 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
299 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
300 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
301 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
302 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
304 # Update from mirror (should ignore out-of-date timestamp)
305 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
306 policy.handler.wait_for_blocker(refreshed)
308 # Update from upstream (should report an error)
309 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
310 try:
311 policy.handler.wait_for_blocker(refreshed)
312 raise Exception("Should have been rejected!")
313 except model.SafeException, ex:
314 assert "New interface's modification time is before old version" in str(ex)
316 # Must finish with the newest version
317 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
318 finally:
319 sys.stdout = old_out
321 def testBackground(self, verbose = False):
322 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
323 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
324 p.freshness = 0
325 p.network_use = model.network_minimal
326 p.solver.solve(p.root, arch.get_host_architecture())
327 assert p.ready
329 @tasks.async
330 def choose_download(registed_cb, nid, actions):
331 try:
332 assert actions == ['download', 'Download'], actions
333 registed_cb(nid, 'download')
334 except:
335 import traceback
336 traceback.print_exc()
337 yield None
339 global ran_gui
340 ran_gui = False
341 old_out = sys.stdout
342 try:
343 sys.stdout = StringIO()
344 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
345 my_dbus.user_callback = choose_download
346 pid = os.getpid()
347 old_exit = os._exit
348 def my_exit(code):
349 # The background handler runs in the same process
350 # as the tests, so don't let it abort.
351 if os.getpid() == pid:
352 raise SystemExit(code)
353 # But, child download processes are OK
354 old_exit(code)
355 try:
356 try:
357 os._exit = my_exit
358 background.spawn_background_update(p, verbose)
359 assert False
360 except SystemExit, ex:
361 self.assertEquals(1, ex.code)
362 finally:
363 os._exit = old_exit
364 finally:
365 sys.stdout = old_out
366 assert ran_gui
368 def testBackgroundVerbose(self):
369 self.testBackground(verbose = True)
371 suite = unittest.makeSuite(TestDownload)
372 if __name__ == '__main__':
373 sys.argv.append('-v')
374 unittest.main()