Added Selections.download_missing
[zeroinstall/zeroinstall-mseaborn.git] / tests / testdownload.py
blob1add4719001d46ee4605313f248ff109869da581
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, DEBUG, INFO, WARN, ERROR
8 sys.path.insert(0, '..')
10 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
11 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
12 from zeroinstall.support import basedir, tasks
13 import data
14 import my_dbus
16 import server
18 ran_gui = False
19 def raise_gui(*args):
20 global ran_gui
21 ran_gui = True
22 background._detach = lambda: False
23 background._exec_gui = raise_gui
24 sys.modules['dbus'] = my_dbus
25 sys.modules['dbus.glib'] = my_dbus
26 my_dbus.types = my_dbus
27 sys.modules['dbus.types'] = my_dbus
29 class Reply:
30 def __init__(self, reply):
31 self.reply = reply
33 def readline(self):
34 return self.reply
36 class DummyHandler(handler.Handler):
37 __slots__ = ['ex', 'tb']
39 def __init__(self):
40 handler.Handler.__init__(self)
41 self.ex = None
43 def wait_for_blocker(self, blocker):
44 self.ex = None
45 handler.Handler.wait_for_blocker(self, blocker)
46 if self.ex:
47 raise self.ex, None, self.tb
49 def report_error(self, ex, tb = None):
50 assert self.ex is None, self.ex
51 self.ex = ex
52 self.tb = tb
54 #import traceback
55 #traceback.print_exc()
57 class TestDownload(BaseTest):
58 def setUp(self):
59 BaseTest.setUp(self)
61 stream = tempfile.TemporaryFile()
62 stream.write(data.thomas_key)
63 stream.seek(0)
64 gpg.import_key(stream)
65 self.child = None
67 trust.trust_db.watchers = []
69 def tearDown(self):
70 BaseTest.tearDown(self)
71 if self.child is not None:
72 os.kill(self.child, signal.SIGTERM)
73 os.waitpid(self.child, 0)
74 self.child = None
76 def testRejectKey(self):
77 old_out = sys.stdout
78 try:
79 sys.stdout = StringIO()
80 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
81 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
82 handler = DummyHandler())
83 assert policy.need_download()
84 sys.stdin = Reply("N\n")
85 try:
86 policy.download_and_execute(['Hello'])
87 assert 0
88 except model.SafeException, ex:
89 if "Not signed with a trusted key" not in str(ex):
90 raise ex
91 finally:
92 sys.stdout = old_out
94 def testRejectKeyXML(self):
95 old_out = sys.stdout
96 try:
97 sys.stdout = StringIO()
98 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
99 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
100 handler = DummyHandler())
101 assert policy.need_download()
102 sys.stdin = Reply("N\n")
103 try:
104 policy.download_and_execute(['Hello'])
105 assert 0
106 except model.SafeException, ex:
107 if "Not signed with a trusted key" not in str(ex):
108 raise
109 finally:
110 sys.stdout = old_out
112 def testImport(self):
113 old_out = sys.stdout
114 try:
115 from zeroinstall.injector import cli
116 import logging
118 rootLogger = getLogger()
119 rootLogger.disabled = True
120 try:
121 try:
122 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
123 assert 0
124 except model.SafeException, ex:
125 assert 'NO-SUCH-FILE' in str(ex)
126 finally:
127 rootLogger.disabled = False
128 rootLogger.setLevel(WARN)
130 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
131 self.assertEquals(0, len(hello.implementations))
133 sys.stdout = StringIO()
134 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
135 sys.stdin = Reply("Y\n")
137 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
138 cli.main(['--import', 'Hello'])
139 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
141 # Check we imported the interface after trusting the key
142 reader.update_from_cache(hello)
143 self.assertEquals(1, len(hello.implementations))
145 # Shouldn't need to prompt the second time
146 sys.stdin = None
147 cli.main(['--import', 'Hello'])
148 finally:
149 sys.stdout = old_out
151 def testSelections(self):
152 from zeroinstall.injector.cli import _download_missing_selections
153 root = qdom.parse(file("selections.xml"))
154 sels = selections.Selections(root)
155 class Options: dry_run = False
157 old_out = sys.stdout
158 try:
159 sys.stdout = StringIO()
160 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
161 sys.stdin = Reply("Y\n")
162 _download_missing_selections(Options(), sels)
163 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://localhost:8000/Hello.xml'].id)
164 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
166 assert sels.download_missing(iface_cache.iface_cache, None) is None
167 finally:
168 sys.stdout = old_out
170 def testAcceptKey(self):
171 old_out = sys.stdout
172 try:
173 sys.stdout = StringIO()
174 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
175 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
176 handler = DummyHandler())
177 assert policy.need_download()
178 sys.stdin = Reply("Y\n")
179 try:
180 policy.download_and_execute(['Hello'], main = 'Missing')
181 assert 0
182 except model.SafeException, ex:
183 if "HelloWorld/Missing" not in str(ex):
184 raise ex
185 finally:
186 sys.stdout = old_out
188 def testRecipe(self):
189 old_out = sys.stdout
190 try:
191 sys.stdout = StringIO()
192 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
193 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
194 try:
195 policy.download_and_execute([])
196 assert False
197 except model.SafeException, ex:
198 if "HelloWorld/Missing" not in str(ex):
199 raise ex
200 finally:
201 sys.stdout = old_out
203 def testSymlink(self):
204 old_out = sys.stdout
205 try:
206 sys.stdout = StringIO()
207 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
208 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
209 handler = DummyHandler())
210 try:
211 policy.download_and_execute([])
212 assert False
213 except model.SafeException, ex:
214 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
215 raise ex
216 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
217 finally:
218 sys.stdout = old_out
220 def testAutopackage(self):
221 old_out = sys.stdout
222 try:
223 sys.stdout = StringIO()
224 self.child = server.handle_requests('HelloWorld.autopackage')
225 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
226 try:
227 policy.download_and_execute([])
228 assert False
229 except model.SafeException, ex:
230 if "HelloWorld/Missing" not in str(ex):
231 raise
232 finally:
233 sys.stdout = old_out
235 def testRecipeFailure(self):
236 old_out = sys.stdout
237 try:
238 sys.stdout = StringIO()
239 self.child = server.handle_requests('*')
240 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
241 handler = DummyHandler())
242 try:
243 policy.download_and_execute([])
244 assert False
245 except download.DownloadError, ex:
246 if "Connection" not in str(ex):
247 raise
248 finally:
249 sys.stdout = old_out
251 def testMirrors(self):
252 old_out = sys.stdout
253 try:
254 sys.stdout = StringIO()
255 getLogger().setLevel(ERROR)
256 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
257 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
258 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
259 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
261 refreshed = policy.solve_with_downloads()
262 policy.handler.wait_for_blocker(refreshed)
263 assert policy.ready
264 finally:
265 sys.stdout = old_out
267 def testReplay(self):
268 old_out = sys.stdout
269 try:
270 sys.stdout = StringIO()
271 getLogger().setLevel(ERROR)
272 iface = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello.xml')
273 mtime = int(os.stat('Hello-new.xml').st_mtime)
274 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
276 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
277 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
278 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
279 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
281 # Update from mirror (should ignore out-of-date timestamp)
282 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
283 policy.handler.wait_for_blocker(refreshed)
285 # Update from upstream (should report an error)
286 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
287 try:
288 policy.handler.wait_for_blocker(refreshed)
289 raise Exception("Should have been rejected!")
290 except model.SafeException, ex:
291 assert "New interface's modification time is before old version" in str(ex)
293 # Must finish with the newest version
294 self.assertEquals(1209206132, iface_cache.iface_cache._get_signature_date(iface.uri))
295 finally:
296 sys.stdout = old_out
298 def testBackground(self, verbose = False):
299 p = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml')
300 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
301 p.freshness = 0
302 p.network_use = model.network_minimal
303 p.solver.solve(p.root, arch.get_host_architecture())
304 assert p.ready
306 @tasks.async
307 def choose_download(registed_cb, nid, actions):
308 try:
309 assert actions == ['download', 'Download'], actions
310 registed_cb(nid, 'download')
311 except:
312 import traceback
313 traceback.print_exc()
314 yield None
316 global ran_gui
317 ran_gui = False
318 old_out = sys.stdout
319 try:
320 sys.stdout = StringIO()
321 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
322 my_dbus.user_callback = choose_download
323 pid = os.getpid()
324 old_exit = os._exit
325 def my_exit(code):
326 # The background handler runs in the same process
327 # as the tests, so don't let it abort.
328 if os.getpid() == pid:
329 raise SystemExit(code)
330 # But, child download processes are OK
331 old_exit(code)
332 try:
333 try:
334 os._exit = my_exit
335 background.spawn_background_update(p, verbose)
336 assert False
337 except SystemExit, ex:
338 self.assertEquals(1, ex.code)
339 finally:
340 os._exit = old_exit
341 finally:
342 sys.stdout = old_out
343 assert ran_gui
345 def testBackgroundVerbose(self):
346 self.testBackground(verbose = True)
348 suite = unittest.makeSuite(TestDownload)
349 if __name__ == '__main__':
350 sys.argv.append('-v')
351 unittest.main()