New key for Ilja Honkonen
[zeroinstall/zeroinstall-rsl.git] / tests / testdownload.py
blob7435edab21d3010efabe336d00743bb55f26937b
1 #!/usr/bin/env python2.5
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
15 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
16 from zeroinstall.support import basedir, tasks
17 import data
18 import my_dbus
20 import server
22 ran_gui = False
23 def raise_gui(*args):
24 global ran_gui
25 ran_gui = True
26 background._detach = lambda: False
27 background._exec_gui = raise_gui
28 sys.modules['dbus'] = my_dbus
29 sys.modules['dbus.glib'] = my_dbus
30 my_dbus.types = my_dbus
31 sys.modules['dbus.types'] = my_dbus
33 @contextmanager
34 def output_suppressed():
35 old_stdout = sys.stdout
36 old_stderr = sys.stderr
37 try:
38 sys.stdout = StringIO()
39 sys.stderr = StringIO()
40 yield
41 finally:
42 sys.stdout = old_stdout
43 sys.stderr = old_stderr
45 class Reply:
46 def __init__(self, reply):
47 self.reply = reply
49 def readline(self):
50 return self.reply
52 class DummyHandler(handler.Handler):
53 __slots__ = ['ex', 'tb']
55 def __init__(self):
56 handler.Handler.__init__(self)
57 self.ex = None
59 def wait_for_blocker(self, blocker):
60 self.ex = None
61 handler.Handler.wait_for_blocker(self, blocker)
62 if self.ex:
63 raise self.ex, None, self.tb
65 def report_error(self, ex, tb = None):
66 assert self.ex is None, self.ex
67 self.ex = ex
68 self.tb = tb
70 #import traceback
71 #traceback.print_exc()
73 class TestDownload(BaseTest):
74 def setUp(self):
75 BaseTest.setUp(self)
77 stream = tempfile.TemporaryFile()
78 stream.write(data.thomas_key)
79 stream.seek(0)
80 gpg.import_key(stream)
81 self.child = None
83 trust.trust_db.watchers = []
85 def tearDown(self):
86 BaseTest.tearDown(self)
87 if self.child is not None:
88 os.kill(self.child, signal.SIGTERM)
89 os.waitpid(self.child, 0)
90 self.child = None
92 def testRejectKey(self):
93 with output_suppressed():
94 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
95 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
96 handler = DummyHandler())
97 assert policy.need_download()
98 sys.stdin = Reply("N\n")
99 try:
100 policy.download_and_execute(['Hello'])
101 assert 0
102 except model.SafeException, ex:
103 if "Can't find all required implementations" not in str(ex):
104 raise ex
105 if "Not signed with a trusted key" not in str(policy.handler.ex):
106 raise ex
108 def testRejectKeyXML(self):
109 with output_suppressed():
110 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
111 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
112 handler = DummyHandler())
113 assert policy.need_download()
114 sys.stdin = Reply("N\n")
115 try:
116 policy.download_and_execute(['Hello'])
117 assert 0
118 except model.SafeException, ex:
119 if "Can't find all required implementations" not in str(ex):
120 raise ex
121 if "Not signed with a trusted key" not in str(policy.handler.ex):
122 raise
124 def testImport(self):
125 from zeroinstall.injector import cli
127 rootLogger = getLogger()
128 rootLogger.disabled = True
129 try:
130 try:
131 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
132 assert 0
133 except model.SafeException, ex:
134 assert 'NO-SUCH-FILE' in str(ex)
135 finally:
136 rootLogger.disabled = False
137 rootLogger.setLevel(WARN)
139 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
140 self.assertEquals(0, len(hello.implementations))
142 with output_suppressed():
143 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
144 sys.stdin = Reply("Y\n")
146 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
147 cli.main(['--import', 'Hello'])
148 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
150 # Check we imported the interface after trusting the key
151 reader.update_from_cache(hello)
152 self.assertEquals(1, len(hello.implementations))
154 # Shouldn't need to prompt the second time
155 sys.stdin = None
156 cli.main(['--import', 'Hello'])
158 def testSelections(self):
159 from zeroinstall.injector.cli import _download_missing_selections
160 root = qdom.parse(file("selections.xml"))
161 sels = selections.Selections(root)
162 class Options: dry_run = False
164 with output_suppressed():
165 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
166 sys.stdin = Reply("Y\n")
167 _download_missing_selections(Options(), sels)
168 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
169 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
171 assert sels.download_missing(iface_cache.iface_cache, None) is None
173 def testSelectionsWithFeed(self):
174 from zeroinstall.injector.cli import _download_missing_selections
175 root = qdom.parse(file("selections.xml"))
176 sels = selections.Selections(root)
177 class Options: dry_run = False
179 with output_suppressed():
180 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
181 sys.stdin = Reply("Y\n")
183 from zeroinstall.injector import fetch
184 from zeroinstall.injector.handler import Handler
185 handler = Handler()
186 fetcher = fetch.Fetcher(handler)
187 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
189 _download_missing_selections(Options(), sels)
190 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
191 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
193 assert sels.download_missing(iface_cache.iface_cache, None) is None
195 def testAcceptKey(self):
196 with output_suppressed():
197 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
198 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
199 handler = DummyHandler())
200 assert policy.need_download()
201 sys.stdin = Reply("Y\n")
202 try:
203 policy.download_and_execute(['Hello'], main = 'Missing')
204 assert 0
205 except model.SafeException, ex:
206 if "HelloWorld/Missing" not in str(ex):
207 raise ex
209 def testRecipe(self):
210 old_out = sys.stdout
211 try:
212 sys.stdout = StringIO()
213 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
214 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
215 try:
216 policy.download_and_execute([])
217 assert False
218 except model.SafeException, ex:
219 if "HelloWorld/Missing" not in str(ex):
220 raise ex
221 finally:
222 sys.stdout = old_out
224 def testSymlink(self):
225 old_out = sys.stdout
226 try:
227 sys.stdout = StringIO()
228 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
229 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
230 handler = DummyHandler())
231 try:
232 policy.download_and_execute([])
233 assert False
234 except model.SafeException, ex:
235 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
236 raise ex
237 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
238 finally:
239 sys.stdout = old_out
241 def testAutopackage(self):
242 old_out = sys.stdout
243 try:
244 sys.stdout = StringIO()
245 self.child = server.handle_requests('HelloWorld.autopackage')
246 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
247 try:
248 policy.download_and_execute([])
249 assert False
250 except model.SafeException, ex:
251 if "HelloWorld/Missing" not in str(ex):
252 raise
253 finally:
254 sys.stdout = old_out
256 def testRecipeFailure(self):
257 old_out = sys.stdout
258 try:
259 sys.stdout = StringIO()
260 self.child = server.handle_requests('*')
261 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
262 handler = DummyHandler())
263 try:
264 policy.download_and_execute([])
265 assert False
266 except download.DownloadError, ex:
267 if "Connection" not in str(ex):
268 raise
269 finally:
270 sys.stdout = old_out
272 def testMirrors(self):
273 old_out = sys.stdout
274 try:
275 sys.stdout = StringIO()
276 getLogger().setLevel(ERROR)
277 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
278 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
279 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
280 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
282 refreshed = policy.solve_with_downloads()
283 policy.handler.wait_for_blocker(refreshed)
284 assert policy.ready
285 finally:
286 sys.stdout = old_out
288 def testReplay(self):
289 old_out = sys.stdout
290 try:
291 sys.stdout = StringIO()
292 getLogger().setLevel(ERROR)
293 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
294 mtime = int(os.stat('Hello-new.xml').st_mtime)
295 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
297 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
298 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
299 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
300 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
302 # Update from mirror (should ignore out-of-date timestamp)
303 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
304 policy.handler.wait_for_blocker(refreshed)
306 # Update from upstream (should report an error)
307 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
308 try:
309 policy.handler.wait_for_blocker(refreshed)
310 raise Exception("Should have been rejected!")
311 except model.SafeException, ex:
312 assert "New interface's modification time is before old version" in str(ex)
314 # Must finish with the newest version
315 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
316 finally:
317 sys.stdout = old_out
319 def testBackground(self, verbose = False):
320 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
321 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
322 p.freshness = 0
323 p.network_use = model.network_minimal
324 p.solver.solve(p.root, arch.get_host_architecture())
325 assert p.ready
327 @tasks.async
328 def choose_download(registed_cb, nid, actions):
329 try:
330 assert actions == ['download', 'Download'], actions
331 registed_cb(nid, 'download')
332 except:
333 import traceback
334 traceback.print_exc()
335 yield None
337 global ran_gui
338 ran_gui = False
339 old_out = sys.stdout
340 try:
341 sys.stdout = StringIO()
342 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
343 my_dbus.user_callback = choose_download
344 pid = os.getpid()
345 old_exit = os._exit
346 def my_exit(code):
347 # The background handler runs in the same process
348 # as the tests, so don't let it abort.
349 if os.getpid() == pid:
350 raise SystemExit(code)
351 # But, child download processes are OK
352 old_exit(code)
353 try:
354 try:
355 os._exit = my_exit
356 background.spawn_background_update(p, verbose)
357 assert False
358 except SystemExit, ex:
359 self.assertEquals(1, ex.code)
360 finally:
361 os._exit = old_exit
362 finally:
363 sys.stdout = old_out
364 assert ran_gui
366 def testBackgroundVerbose(self):
367 self.testBackground(verbose = True)
369 suite = unittest.makeSuite(TestDownload)
370 if __name__ == '__main__':
371 sys.argv.append('-v')
372 unittest.main()