Allow user to skip key lookup
[zeroinstall/zeroinstall-rsl.git] / tests / testdownload.py
blobd7af08b646dd23238a63cff8b1eb2a3915c81f23
1 #!/usr/bin/env python2.5
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
11 os.environ['PYTHONPATH'] = os.path.abspath('..')
13 os.environ["http_proxy"] = "localhost:8000"
15 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
16 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
17 from zeroinstall.support import basedir, tasks
18 from zeroinstall.injector import fetch
19 import data
20 import my_dbus
22 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
24 import server
26 ran_gui = False
27 def raise_gui(*args):
28 global ran_gui
29 ran_gui = True
30 background._detach = lambda: False
31 background._exec_gui = raise_gui
32 sys.modules['dbus'] = my_dbus
33 sys.modules['dbus.glib'] = my_dbus
34 my_dbus.types = my_dbus
35 sys.modules['dbus.types'] = my_dbus
37 @contextmanager
38 def output_suppressed():
39 old_stdout = sys.stdout
40 old_stderr = sys.stderr
41 try:
42 sys.stdout = StringIO()
43 sys.stderr = StringIO()
44 yield
45 finally:
46 sys.stdout = old_stdout
47 sys.stderr = old_stderr
49 class Reply:
50 def __init__(self, reply):
51 self.reply = reply
53 def readline(self):
54 return self.reply
56 class DummyHandler(handler.Handler):
57 __slots__ = ['ex', 'tb']
59 def __init__(self):
60 handler.Handler.__init__(self)
61 self.ex = None
63 def wait_for_blocker(self, blocker):
64 self.ex = None
65 handler.Handler.wait_for_blocker(self, blocker)
66 if self.ex:
67 raise self.ex, None, self.tb
69 def report_error(self, ex, tb = None):
70 assert self.ex is None, self.ex
71 self.ex = ex
72 self.tb = tb
74 #import traceback
75 #traceback.print_exc()
77 class TestDownload(BaseTest):
78 def setUp(self):
79 BaseTest.setUp(self)
81 stream = tempfile.TemporaryFile()
82 stream.write(data.thomas_key)
83 stream.seek(0)
84 gpg.import_key(stream)
85 self.child = None
87 trust.trust_db.watchers = []
89 def tearDown(self):
90 BaseTest.tearDown(self)
91 if self.child is not None:
92 os.kill(self.child, signal.SIGTERM)
93 os.waitpid(self.child, 0)
94 self.child = None
96 def testRejectKey(self):
97 with output_suppressed():
98 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
99 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
100 handler = DummyHandler())
101 assert policy.need_download()
102 sys.stdin = Reply("N\n")
103 try:
104 policy.download_and_execute(['Hello'])
105 assert 0
106 except model.SafeException, ex:
107 if "Can't find all required implementations" not in str(ex):
108 raise ex
109 if "Not signed with a trusted key" not in str(policy.handler.ex):
110 raise ex
112 def testRejectKeyXML(self):
113 with output_suppressed():
114 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
115 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
116 handler = DummyHandler())
117 assert policy.need_download()
118 sys.stdin = Reply("N\n")
119 try:
120 policy.download_and_execute(['Hello'])
121 assert 0
122 except model.SafeException, ex:
123 if "Can't find all required implementations" not in str(ex):
124 raise ex
125 if "Not signed with a trusted key" not in str(policy.handler.ex):
126 raise
128 def testImport(self):
129 from zeroinstall.injector import cli
131 rootLogger = getLogger()
132 rootLogger.disabled = True
133 try:
134 try:
135 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
136 assert 0
137 except model.SafeException, ex:
138 assert 'NO-SUCH-FILE' in str(ex)
139 finally:
140 rootLogger.disabled = False
141 rootLogger.setLevel(WARN)
143 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
144 self.assertEquals(0, len(hello.implementations))
146 with output_suppressed():
147 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
148 sys.stdin = Reply("Y\n")
150 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
151 cli.main(['--import', 'Hello'])
152 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
154 # Check we imported the interface after trusting the key
155 reader.update_from_cache(hello)
156 self.assertEquals(1, len(hello.implementations))
158 # Shouldn't need to prompt the second time
159 sys.stdin = None
160 cli.main(['--import', 'Hello'])
162 def testSelections(self):
163 from zeroinstall.injector.cli import _download_missing_selections
164 root = qdom.parse(file("selections.xml"))
165 sels = selections.Selections(root)
166 class Options: dry_run = False
168 with output_suppressed():
169 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
170 sys.stdin = Reply("Y\n")
171 _download_missing_selections(Options(), sels)
172 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
173 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
175 assert sels.download_missing(iface_cache.iface_cache, None) is None
177 def testSelectionsWithFeed(self):
178 from zeroinstall.injector.cli import _download_missing_selections
179 root = qdom.parse(file("selections.xml"))
180 sels = selections.Selections(root)
181 class Options: dry_run = False
183 with output_suppressed():
184 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
185 sys.stdin = Reply("Y\n")
187 from zeroinstall.injector.handler import Handler
188 handler = Handler()
189 fetcher = fetch.Fetcher(handler)
190 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
192 _download_missing_selections(Options(), sels)
193 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://example.com:8000/Hello.xml'].id)
194 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
196 assert sels.download_missing(iface_cache.iface_cache, None) is None
198 def testAcceptKey(self):
199 with output_suppressed():
200 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
201 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
202 handler = DummyHandler())
203 assert policy.need_download()
204 sys.stdin = Reply("Y\n")
205 try:
206 policy.download_and_execute(['Hello'], main = 'Missing')
207 assert 0
208 except model.SafeException, ex:
209 if "HelloWorld/Missing" not in str(ex):
210 raise ex
212 def testRecipe(self):
213 old_out = sys.stdout
214 try:
215 sys.stdout = StringIO()
216 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
217 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
218 try:
219 policy.download_and_execute([])
220 assert False
221 except model.SafeException, ex:
222 if "HelloWorld/Missing" not in str(ex):
223 raise ex
224 finally:
225 sys.stdout = old_out
227 def testSymlink(self):
228 old_out = sys.stdout
229 try:
230 sys.stdout = StringIO()
231 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
232 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
233 handler = DummyHandler())
234 try:
235 policy.download_and_execute([])
236 assert False
237 except model.SafeException, ex:
238 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
239 raise ex
240 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
241 finally:
242 sys.stdout = old_out
244 def testAutopackage(self):
245 old_out = sys.stdout
246 try:
247 sys.stdout = StringIO()
248 self.child = server.handle_requests('HelloWorld.autopackage')
249 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
250 try:
251 policy.download_and_execute([])
252 assert False
253 except model.SafeException, ex:
254 if "HelloWorld/Missing" not in str(ex):
255 raise
256 finally:
257 sys.stdout = old_out
259 def testRecipeFailure(self):
260 old_out = sys.stdout
261 try:
262 sys.stdout = StringIO()
263 self.child = server.handle_requests('*')
264 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
265 handler = DummyHandler())
266 try:
267 policy.download_and_execute([])
268 assert False
269 except download.DownloadError, ex:
270 if "Connection" not in str(ex):
271 raise
272 finally:
273 sys.stdout = old_out
275 def testMirrors(self):
276 old_out = sys.stdout
277 try:
278 sys.stdout = StringIO()
279 getLogger().setLevel(ERROR)
280 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
281 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
282 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
283 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
285 refreshed = policy.solve_with_downloads()
286 policy.handler.wait_for_blocker(refreshed)
287 assert policy.ready
288 finally:
289 sys.stdout = old_out
291 def testReplay(self):
292 old_out = sys.stdout
293 try:
294 sys.stdout = StringIO()
295 getLogger().setLevel(ERROR)
296 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
297 mtime = int(os.stat('Hello-new.xml').st_mtime)
298 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
300 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
301 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
302 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
303 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
305 # Update from mirror (should ignore out-of-date timestamp)
306 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
307 policy.handler.wait_for_blocker(refreshed)
309 # Update from upstream (should report an error)
310 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
311 try:
312 policy.handler.wait_for_blocker(refreshed)
313 raise Exception("Should have been rejected!")
314 except model.SafeException, ex:
315 assert "New interface's modification time is before old version" in str(ex)
317 # Must finish with the newest version
318 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
319 finally:
320 sys.stdout = old_out
322 def testBackground(self, verbose = False):
323 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
324 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
325 p.freshness = 0
326 p.network_use = model.network_minimal
327 p.solver.solve(p.root, arch.get_host_architecture())
328 assert p.ready
330 @tasks.async
331 def choose_download(registed_cb, nid, actions):
332 try:
333 assert actions == ['download', 'Download'], actions
334 registed_cb(nid, 'download')
335 except:
336 import traceback
337 traceback.print_exc()
338 yield None
340 global ran_gui
341 ran_gui = False
342 old_out = sys.stdout
343 try:
344 sys.stdout = StringIO()
345 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
346 my_dbus.user_callback = choose_download
347 pid = os.getpid()
348 old_exit = os._exit
349 def my_exit(code):
350 # The background handler runs in the same process
351 # as the tests, so don't let it abort.
352 if os.getpid() == pid:
353 raise SystemExit(code)
354 # But, child download processes are OK
355 old_exit(code)
356 try:
357 try:
358 os._exit = my_exit
359 background.spawn_background_update(p, verbose)
360 assert False
361 except SystemExit, ex:
362 self.assertEquals(1, ex.code)
363 finally:
364 os._exit = old_exit
365 finally:
366 sys.stdout = old_out
367 assert ran_gui
369 def testBackgroundVerbose(self):
370 self.testBackground(verbose = True)
372 suite = unittest.makeSuite(TestDownload)
373 if __name__ == '__main__':
374 sys.argv.append('-v')
375 unittest.main()