More detailed bug reports about why no version could be selected
[zeroinstall/zeroinstall-rsl.git] / tests / testdownload.py
blobf5b85f2966db945b0f26aba062c10ab419580674
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, WARN, ERROR
8 sys.path.insert(0, '..')
10 # If http_proxy is set it can cause the download tests to fail.
11 os.environ["http_proxy"] = ""
13 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
14 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
15 from zeroinstall.support import basedir, tasks
16 import data
17 import my_dbus
19 import server
21 ran_gui = False
22 def raise_gui(*args):
23 global ran_gui
24 ran_gui = True
25 background._detach = lambda: False
26 background._exec_gui = raise_gui
27 sys.modules['dbus'] = my_dbus
28 sys.modules['dbus.glib'] = my_dbus
29 my_dbus.types = my_dbus
30 sys.modules['dbus.types'] = my_dbus
32 class Reply:
33 def __init__(self, reply):
34 self.reply = reply
36 def readline(self):
37 return self.reply
39 class DummyHandler(handler.Handler):
40 __slots__ = ['ex', 'tb']
42 def __init__(self):
43 handler.Handler.__init__(self)
44 self.ex = None
46 def wait_for_blocker(self, blocker):
47 self.ex = None
48 handler.Handler.wait_for_blocker(self, blocker)
49 if self.ex:
50 raise self.ex, None, self.tb
52 def report_error(self, ex, tb = None):
53 assert self.ex is None, self.ex
54 self.ex = ex
55 self.tb = tb
57 #import traceback
58 #traceback.print_exc()
60 class TestDownload(BaseTest):
61 def setUp(self):
62 BaseTest.setUp(self)
64 stream = tempfile.TemporaryFile()
65 stream.write(data.thomas_key)
66 stream.seek(0)
67 gpg.import_key(stream)
68 self.child = None
70 trust.trust_db.watchers = []
72 def tearDown(self):
73 BaseTest.tearDown(self)
74 if self.child is not None:
75 os.kill(self.child, signal.SIGTERM)
76 os.waitpid(self.child, 0)
77 self.child = None
79 def testRejectKey(self):
80 old_out = sys.stdout
81 try:
82 sys.stdout = StringIO()
83 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
84 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
85 handler = DummyHandler())
86 assert policy.need_download()
87 sys.stdin = Reply("N\n")
88 try:
89 policy.download_and_execute(['Hello'])
90 assert 0
91 except model.SafeException, ex:
92 if "Not signed with a trusted key" not in str(ex):
93 raise ex
94 finally:
95 sys.stdout = old_out
97 def testRejectKeyXML(self):
98 old_out = sys.stdout
99 try:
100 sys.stdout = StringIO()
101 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
102 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
103 handler = DummyHandler())
104 assert policy.need_download()
105 sys.stdin = Reply("N\n")
106 try:
107 policy.download_and_execute(['Hello'])
108 assert 0
109 except model.SafeException, ex:
110 if "Not signed with a trusted key" not in str(ex):
111 raise
112 finally:
113 sys.stdout = old_out
115 def testImport(self):
116 old_out = sys.stdout
117 try:
118 from zeroinstall.injector import cli
120 rootLogger = getLogger()
121 rootLogger.disabled = True
122 try:
123 try:
124 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
125 assert 0
126 except model.SafeException, ex:
127 assert 'NO-SUCH-FILE' in str(ex)
128 finally:
129 rootLogger.disabled = False
130 rootLogger.setLevel(WARN)
132 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
133 self.assertEquals(0, len(hello.implementations))
135 sys.stdout = StringIO()
136 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
137 sys.stdin = Reply("Y\n")
139 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
140 cli.main(['--import', 'Hello'])
141 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
143 # Check we imported the interface after trusting the key
144 reader.update_from_cache(hello)
145 self.assertEquals(1, len(hello.implementations))
147 # Shouldn't need to prompt the second time
148 sys.stdin = None
149 cli.main(['--import', 'Hello'])
150 finally:
151 sys.stdout = old_out
153 def testSelections(self):
154 from zeroinstall.injector.cli import _download_missing_selections
155 root = qdom.parse(file("selections.xml"))
156 sels = selections.Selections(root)
157 class Options: dry_run = False
159 old_out = sys.stdout
160 try:
161 sys.stdout = StringIO()
162 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
163 sys.stdin = Reply("Y\n")
164 _download_missing_selections(Options(), sels)
165 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://localhost:8000/Hello.xml'].id)
166 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
168 assert sels.download_missing(iface_cache.iface_cache, None) is None
169 finally:
170 sys.stdout = old_out
172 def testSelectionsWithFeed(self):
173 from zeroinstall.injector.cli import _download_missing_selections
174 root = qdom.parse(file("selections.xml"))
175 sels = selections.Selections(root)
176 class Options: dry_run = False
178 old_out = sys.stdout
179 try:
180 sys.stdout = StringIO()
181 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
182 sys.stdin = Reply("Y\n")
184 from zeroinstall.injector import fetch
185 from zeroinstall.injector.handler import Handler
186 handler = Handler()
187 fetcher = fetch.Fetcher(handler)
188 handler.wait_for_blocker(fetcher.download_and_import_feed('http://localhost:8000/Hello.xml', iface_cache.iface_cache))
190 _download_missing_selections(Options(), sels)
191 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://localhost:8000/Hello.xml'].id)
192 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
194 assert sels.download_missing(iface_cache.iface_cache, None) is None
195 finally:
196 sys.stdout = old_out
198 def testAcceptKey(self):
199 old_out = sys.stdout
200 try:
201 sys.stdout = StringIO()
202 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
203 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
204 handler = DummyHandler())
205 assert policy.need_download()
206 sys.stdin = Reply("Y\n")
207 try:
208 policy.download_and_execute(['Hello'], main = 'Missing')
209 assert 0
210 except model.SafeException, ex:
211 if "HelloWorld/Missing" not in str(ex):
212 raise ex
213 finally:
214 sys.stdout = old_out
216 def testRecipe(self):
217 old_out = sys.stdout
218 try:
219 sys.stdout = StringIO()
220 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
221 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
222 try:
223 policy.download_and_execute([])
224 assert False
225 except model.SafeException, ex:
226 if "HelloWorld/Missing" not in str(ex):
227 raise ex
228 finally:
229 sys.stdout = old_out
231 def testSymlink(self):
232 old_out = sys.stdout
233 try:
234 sys.stdout = StringIO()
235 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
236 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
237 handler = DummyHandler())
238 try:
239 policy.download_and_execute([])
240 assert False
241 except model.SafeException, ex:
242 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
243 raise ex
244 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
245 finally:
246 sys.stdout = old_out
248 def testAutopackage(self):
249 old_out = sys.stdout
250 try:
251 sys.stdout = StringIO()
252 self.child = server.handle_requests('HelloWorld.autopackage')
253 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
254 try:
255 policy.download_and_execute([])
256 assert False
257 except model.SafeException, ex:
258 if "HelloWorld/Missing" not in str(ex):
259 raise
260 finally:
261 sys.stdout = old_out
263 def testRecipeFailure(self):
264 old_out = sys.stdout
265 try:
266 sys.stdout = StringIO()
267 self.child = server.handle_requests('*')
268 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
269 handler = DummyHandler())
270 try:
271 policy.download_and_execute([])
272 assert False
273 except download.DownloadError, ex:
274 if "Connection" not in str(ex):
275 raise
276 finally:
277 sys.stdout = old_out
279 def testMirrors(self):
280 old_out = sys.stdout
281 try:
282 sys.stdout = StringIO()
283 getLogger().setLevel(ERROR)
284 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
285 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
286 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
287 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
289 refreshed = policy.solve_with_downloads()
290 policy.handler.wait_for_blocker(refreshed)
291 assert policy.ready
292 finally:
293 sys.stdout = old_out
295 def testReplay(self):
296 old_out = sys.stdout
297 try:
298 sys.stdout = StringIO()
299 getLogger().setLevel(ERROR)
300 iface = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello.xml')
301 mtime = int(os.stat('Hello-new.xml').st_mtime)
302 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
304 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
305 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
306 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
307 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
309 # Update from mirror (should ignore out-of-date timestamp)
310 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
311 policy.handler.wait_for_blocker(refreshed)
313 # Update from upstream (should report an error)
314 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
315 try:
316 policy.handler.wait_for_blocker(refreshed)
317 raise Exception("Should have been rejected!")
318 except model.SafeException, ex:
319 assert "New interface's modification time is before old version" in str(ex)
321 # Must finish with the newest version
322 self.assertEquals(1209206132, iface_cache.iface_cache._get_signature_date(iface.uri))
323 finally:
324 sys.stdout = old_out
326 def testBackground(self, verbose = False):
327 p = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml')
328 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
329 p.freshness = 0
330 p.network_use = model.network_minimal
331 p.solver.solve(p.root, arch.get_host_architecture())
332 assert p.ready
334 @tasks.async
335 def choose_download(registed_cb, nid, actions):
336 try:
337 assert actions == ['download', 'Download'], actions
338 registed_cb(nid, 'download')
339 except:
340 import traceback
341 traceback.print_exc()
342 yield None
344 global ran_gui
345 ran_gui = False
346 old_out = sys.stdout
347 try:
348 sys.stdout = StringIO()
349 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
350 my_dbus.user_callback = choose_download
351 pid = os.getpid()
352 old_exit = os._exit
353 def my_exit(code):
354 # The background handler runs in the same process
355 # as the tests, so don't let it abort.
356 if os.getpid() == pid:
357 raise SystemExit(code)
358 # But, child download processes are OK
359 old_exit(code)
360 try:
361 try:
362 os._exit = my_exit
363 background.spawn_background_update(p, verbose)
364 assert False
365 except SystemExit, ex:
366 self.assertEquals(1, ex.code)
367 finally:
368 os._exit = old_exit
369 finally:
370 sys.stdout = old_out
371 assert ran_gui
373 def testBackgroundVerbose(self):
374 self.testBackground(verbose = True)
376 suite = unittest.makeSuite(TestDownload)
377 if __name__ == '__main__':
378 sys.argv.append('-v')
379 unittest.main()