Import _ into each module rather than using a builtin
[zeroinstall.git] / tests / testdownload.py
blob74e08e23ac2d3114212643c1e73028271d772bfb
1 #!/usr/bin/env python2.5
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
11 os.environ['PYTHONPATH'] = os.path.abspath('..')
13 os.environ["http_proxy"] = "localhost:8000"
15 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
16 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 import data
19 import my_dbus
21 import server
23 ran_gui = False
24 def raise_gui(*args):
25 global ran_gui
26 ran_gui = True
27 background._detach = lambda: False
28 background._exec_gui = raise_gui
29 sys.modules['dbus'] = my_dbus
30 sys.modules['dbus.glib'] = my_dbus
31 my_dbus.types = my_dbus
32 sys.modules['dbus.types'] = my_dbus
34 @contextmanager
35 def output_suppressed():
36 old_stdout = sys.stdout
37 old_stderr = sys.stderr
38 try:
39 sys.stdout = StringIO()
40 sys.stderr = StringIO()
41 yield
42 finally:
43 sys.stdout = old_stdout
44 sys.stderr = old_stderr
46 class Reply:
47 def __init__(self, reply):
48 self.reply = reply
50 def readline(self):
51 return self.reply
53 class DummyHandler(handler.Handler):
54 __slots__ = ['ex', 'tb']
56 def __init__(self):
57 handler.Handler.__init__(self)
58 self.ex = None
60 def wait_for_blocker(self, blocker):
61 self.ex = None
62 handler.Handler.wait_for_blocker(self, blocker)
63 if self.ex:
64 raise self.ex, None, self.tb
66 def report_error(self, ex, tb = None):
67 assert self.ex is None, self.ex
68 self.ex = ex
69 self.tb = tb
71 #import traceback
72 #traceback.print_exc()
74 class TestDownload(BaseTest):
75 def setUp(self):
76 BaseTest.setUp(self)
78 stream = tempfile.TemporaryFile()
79 stream.write(data.thomas_key)
80 stream.seek(0)
81 gpg.import_key(stream)
82 self.child = None
84 trust.trust_db.watchers = []
86 def tearDown(self):
87 BaseTest.tearDown(self)
88 if self.child is not None:
89 os.kill(self.child, signal.SIGTERM)
90 os.waitpid(self.child, 0)
91 self.child = None
93 def testRejectKey(self):
94 with output_suppressed():
95 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
96 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
97 handler = DummyHandler())
98 assert policy.need_download()
99 sys.stdin = Reply("N\n")
100 try:
101 policy.download_and_execute(['Hello'])
102 assert 0
103 except model.SafeException, ex:
104 if "Can't find all required implementations" not in str(ex):
105 raise ex
106 if "Not signed with a trusted key" not in str(policy.handler.ex):
107 raise ex
109 def testRejectKeyXML(self):
110 with output_suppressed():
111 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
112 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
113 handler = DummyHandler())
114 assert policy.need_download()
115 sys.stdin = Reply("N\n")
116 try:
117 policy.download_and_execute(['Hello'])
118 assert 0
119 except model.SafeException, ex:
120 if "Can't find all required implementations" not in str(ex):
121 raise ex
122 if "Not signed with a trusted key" not in str(policy.handler.ex):
123 raise
125 def testImport(self):
126 from zeroinstall.injector import cli
128 rootLogger = getLogger()
129 rootLogger.disabled = True
130 try:
131 try:
132 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
133 assert 0
134 except model.SafeException, ex:
135 assert 'NO-SUCH-FILE' in str(ex)
136 finally:
137 rootLogger.disabled = False
138 rootLogger.setLevel(WARN)
140 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
141 self.assertEquals(0, len(hello.implementations))
143 with output_suppressed():
144 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
145 sys.stdin = Reply("Y\n")
147 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
148 cli.main(['--import', 'Hello'])
149 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
151 # Check we imported the interface after trusting the key
152 reader.update_from_cache(hello)
153 self.assertEquals(1, len(hello.implementations))
155 # Shouldn't need to prompt the second time
156 sys.stdin = None
157 cli.main(['--import', 'Hello'])
159 def testSelections(self):
160 from zeroinstall.injector.cli import _download_missing_selections
161 root = qdom.parse(file("selections.xml"))
162 sels = selections.Selections(root)
163 class Options: dry_run = False
165 with output_suppressed():
166 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
167 sys.stdin = Reply("Y\n")
168 _download_missing_selections(Options(), sels)
169 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
170 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
172 assert sels.download_missing(iface_cache.iface_cache, None) is None
174 def testSelectionsWithFeed(self):
175 from zeroinstall.injector.cli import _download_missing_selections
176 root = qdom.parse(file("selections.xml"))
177 sels = selections.Selections(root)
178 class Options: dry_run = False
180 with output_suppressed():
181 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
182 sys.stdin = Reply("Y\n")
184 from zeroinstall.injector import fetch
185 from zeroinstall.injector.handler import Handler
186 handler = Handler()
187 fetcher = fetch.Fetcher(handler)
188 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
190 _download_missing_selections(Options(), sels)
191 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
192 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
194 assert sels.download_missing(iface_cache.iface_cache, None) is None
196 def testAcceptKey(self):
197 with output_suppressed():
198 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
199 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
200 handler = DummyHandler())
201 assert policy.need_download()
202 sys.stdin = Reply("Y\n")
203 try:
204 policy.download_and_execute(['Hello'], main = 'Missing')
205 assert 0
206 except model.SafeException, ex:
207 if "HelloWorld/Missing" not in str(ex):
208 raise ex
210 def testRecipe(self):
211 old_out = sys.stdout
212 try:
213 sys.stdout = StringIO()
214 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
215 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
216 try:
217 policy.download_and_execute([])
218 assert False
219 except model.SafeException, ex:
220 if "HelloWorld/Missing" not in str(ex):
221 raise ex
222 finally:
223 sys.stdout = old_out
225 def testSymlink(self):
226 old_out = sys.stdout
227 try:
228 sys.stdout = StringIO()
229 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
230 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
231 handler = DummyHandler())
232 try:
233 policy.download_and_execute([])
234 assert False
235 except model.SafeException, ex:
236 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
237 raise ex
238 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
239 finally:
240 sys.stdout = old_out
242 def testAutopackage(self):
243 old_out = sys.stdout
244 try:
245 sys.stdout = StringIO()
246 self.child = server.handle_requests('HelloWorld.autopackage')
247 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
248 try:
249 policy.download_and_execute([])
250 assert False
251 except model.SafeException, ex:
252 if "HelloWorld/Missing" not in str(ex):
253 raise
254 finally:
255 sys.stdout = old_out
257 def testRecipeFailure(self):
258 old_out = sys.stdout
259 try:
260 sys.stdout = StringIO()
261 self.child = server.handle_requests('*')
262 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
263 handler = DummyHandler())
264 try:
265 policy.download_and_execute([])
266 assert False
267 except download.DownloadError, ex:
268 if "Connection" not in str(ex):
269 raise
270 finally:
271 sys.stdout = old_out
273 def testMirrors(self):
274 old_out = sys.stdout
275 try:
276 sys.stdout = StringIO()
277 getLogger().setLevel(ERROR)
278 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
279 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
280 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
281 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
283 refreshed = policy.solve_with_downloads()
284 policy.handler.wait_for_blocker(refreshed)
285 assert policy.ready
286 finally:
287 sys.stdout = old_out
289 def testReplay(self):
290 old_out = sys.stdout
291 try:
292 sys.stdout = StringIO()
293 getLogger().setLevel(ERROR)
294 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
295 mtime = int(os.stat('Hello-new.xml').st_mtime)
296 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
298 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
299 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
300 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
301 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
303 # Update from mirror (should ignore out-of-date timestamp)
304 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
305 policy.handler.wait_for_blocker(refreshed)
307 # Update from upstream (should report an error)
308 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
309 try:
310 policy.handler.wait_for_blocker(refreshed)
311 raise Exception("Should have been rejected!")
312 except model.SafeException, ex:
313 assert "New interface's modification time is before old version" in str(ex)
315 # Must finish with the newest version
316 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
317 finally:
318 sys.stdout = old_out
320 def testBackground(self, verbose = False):
321 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
322 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
323 p.freshness = 0
324 p.network_use = model.network_minimal
325 p.solver.solve(p.root, arch.get_host_architecture())
326 assert p.ready
328 @tasks.async
329 def choose_download(registed_cb, nid, actions):
330 try:
331 assert actions == ['download', 'Download'], actions
332 registed_cb(nid, 'download')
333 except:
334 import traceback
335 traceback.print_exc()
336 yield None
338 global ran_gui
339 ran_gui = False
340 old_out = sys.stdout
341 try:
342 sys.stdout = StringIO()
343 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
344 my_dbus.user_callback = choose_download
345 pid = os.getpid()
346 old_exit = os._exit
347 def my_exit(code):
348 # The background handler runs in the same process
349 # as the tests, so don't let it abort.
350 if os.getpid() == pid:
351 raise SystemExit(code)
352 # But, child download processes are OK
353 old_exit(code)
354 try:
355 try:
356 os._exit = my_exit
357 background.spawn_background_update(p, verbose)
358 assert False
359 except SystemExit, ex:
360 self.assertEquals(1, ex.code)
361 finally:
362 os._exit = old_exit
363 finally:
364 sys.stdout = old_out
365 assert ran_gui
367 def testBackgroundVerbose(self):
368 self.testBackground(verbose = True)
370 suite = unittest.makeSuite(TestDownload)
371 if __name__ == '__main__':
372 sys.argv.append('-v')
373 unittest.main()