Start development series 0.37-post
[zeroinstall/zeroinstall-rsl.git] / tests / testdownload.py
blobc5445c1046ba25316262b11432d78479d77de560
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, WARN, ERROR
8 sys.path.insert(0, '..')
10 # If http_proxy is set it can cause the download tests to fail.
11 os.environ["http_proxy"] = ""
13 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
14 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
15 from zeroinstall.support import basedir, tasks
16 import data
17 import my_dbus
19 import server
21 ran_gui = False
22 def raise_gui(*args):
23 global ran_gui
24 ran_gui = True
25 background._detach = lambda: False
26 background._exec_gui = raise_gui
27 sys.modules['dbus'] = my_dbus
28 sys.modules['dbus.glib'] = my_dbus
29 my_dbus.types = my_dbus
30 sys.modules['dbus.types'] = my_dbus
32 class Reply:
33 def __init__(self, reply):
34 self.reply = reply
36 def readline(self):
37 return self.reply
39 class DummyHandler(handler.Handler):
40 __slots__ = ['ex', 'tb']
42 def __init__(self):
43 handler.Handler.__init__(self)
44 self.ex = None
46 def wait_for_blocker(self, blocker):
47 self.ex = None
48 handler.Handler.wait_for_blocker(self, blocker)
49 if self.ex:
50 raise self.ex, None, self.tb
52 def report_error(self, ex, tb = None):
53 assert self.ex is None, self.ex
54 self.ex = ex
55 self.tb = tb
57 #import traceback
58 #traceback.print_exc()
60 class TestDownload(BaseTest):
61 def setUp(self):
62 BaseTest.setUp(self)
64 stream = tempfile.TemporaryFile()
65 stream.write(data.thomas_key)
66 stream.seek(0)
67 gpg.import_key(stream)
68 self.child = None
70 trust.trust_db.watchers = []
72 def tearDown(self):
73 BaseTest.tearDown(self)
74 if self.child is not None:
75 os.kill(self.child, signal.SIGTERM)
76 os.waitpid(self.child, 0)
77 self.child = None
79 def testRejectKey(self):
80 old_out = sys.stdout
81 try:
82 sys.stdout = StringIO()
83 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
84 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
85 handler = DummyHandler())
86 assert policy.need_download()
87 sys.stdin = Reply("N\n")
88 try:
89 policy.download_and_execute(['Hello'])
90 assert 0
91 except model.SafeException, ex:
92 if "Not signed with a trusted key" not in str(ex):
93 raise ex
94 finally:
95 sys.stdout = old_out
97 def testRejectKeyXML(self):
98 old_out = sys.stdout
99 try:
100 sys.stdout = StringIO()
101 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
102 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
103 handler = DummyHandler())
104 assert policy.need_download()
105 sys.stdin = Reply("N\n")
106 try:
107 policy.download_and_execute(['Hello'])
108 assert 0
109 except model.SafeException, ex:
110 if "Not signed with a trusted key" not in str(ex):
111 raise
112 finally:
113 sys.stdout = old_out
115 def testImport(self):
116 old_out = sys.stdout
117 try:
118 from zeroinstall.injector import cli
120 rootLogger = getLogger()
121 rootLogger.disabled = True
122 try:
123 try:
124 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
125 assert 0
126 except model.SafeException, ex:
127 assert 'NO-SUCH-FILE' in str(ex)
128 finally:
129 rootLogger.disabled = False
130 rootLogger.setLevel(WARN)
132 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
133 self.assertEquals(0, len(hello.implementations))
135 sys.stdout = StringIO()
136 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
137 sys.stdin = Reply("Y\n")
139 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
140 cli.main(['--import', 'Hello'])
141 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
143 # Check we imported the interface after trusting the key
144 reader.update_from_cache(hello)
145 self.assertEquals(1, len(hello.implementations))
147 # Shouldn't need to prompt the second time
148 sys.stdin = None
149 cli.main(['--import', 'Hello'])
150 finally:
151 sys.stdout = old_out
153 def testSelections(self):
154 from zeroinstall.injector.cli import _download_missing_selections
155 root = qdom.parse(file("selections.xml"))
156 sels = selections.Selections(root)
157 class Options: dry_run = False
159 old_out = sys.stdout
160 try:
161 sys.stdout = StringIO()
162 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
163 sys.stdin = Reply("Y\n")
164 _download_missing_selections(Options(), sels)
165 path = iface_cache.iface_cache.stores.lookup(sels.selections['http://localhost:8000/Hello.xml'].id)
166 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
168 assert sels.download_missing(iface_cache.iface_cache, None) is None
169 finally:
170 sys.stdout = old_out
172 def testAcceptKey(self):
173 old_out = sys.stdout
174 try:
175 sys.stdout = StringIO()
176 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
177 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
178 handler = DummyHandler())
179 assert policy.need_download()
180 sys.stdin = Reply("Y\n")
181 try:
182 policy.download_and_execute(['Hello'], main = 'Missing')
183 assert 0
184 except model.SafeException, ex:
185 if "HelloWorld/Missing" not in str(ex):
186 raise ex
187 finally:
188 sys.stdout = old_out
190 def testRecipe(self):
191 old_out = sys.stdout
192 try:
193 sys.stdout = StringIO()
194 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
195 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
196 try:
197 policy.download_and_execute([])
198 assert False
199 except model.SafeException, ex:
200 if "HelloWorld/Missing" not in str(ex):
201 raise ex
202 finally:
203 sys.stdout = old_out
205 def testSymlink(self):
206 old_out = sys.stdout
207 try:
208 sys.stdout = StringIO()
209 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
210 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
211 handler = DummyHandler())
212 try:
213 policy.download_and_execute([])
214 assert False
215 except model.SafeException, ex:
216 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
217 raise ex
218 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
219 finally:
220 sys.stdout = old_out
222 def testAutopackage(self):
223 old_out = sys.stdout
224 try:
225 sys.stdout = StringIO()
226 self.child = server.handle_requests('HelloWorld.autopackage')
227 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
228 try:
229 policy.download_and_execute([])
230 assert False
231 except model.SafeException, ex:
232 if "HelloWorld/Missing" not in str(ex):
233 raise
234 finally:
235 sys.stdout = old_out
237 def testRecipeFailure(self):
238 old_out = sys.stdout
239 try:
240 sys.stdout = StringIO()
241 self.child = server.handle_requests('*')
242 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
243 handler = DummyHandler())
244 try:
245 policy.download_and_execute([])
246 assert False
247 except download.DownloadError, ex:
248 if "Connection" not in str(ex):
249 raise
250 finally:
251 sys.stdout = old_out
253 def testMirrors(self):
254 old_out = sys.stdout
255 try:
256 sys.stdout = StringIO()
257 getLogger().setLevel(ERROR)
258 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
259 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
260 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
261 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
263 refreshed = policy.solve_with_downloads()
264 policy.handler.wait_for_blocker(refreshed)
265 assert policy.ready
266 finally:
267 sys.stdout = old_out
269 def testReplay(self):
270 old_out = sys.stdout
271 try:
272 sys.stdout = StringIO()
273 getLogger().setLevel(ERROR)
274 iface = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello.xml')
275 mtime = int(os.stat('Hello-new.xml').st_mtime)
276 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
278 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
279 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
280 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
281 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
283 # Update from mirror (should ignore out-of-date timestamp)
284 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
285 policy.handler.wait_for_blocker(refreshed)
287 # Update from upstream (should report an error)
288 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
289 try:
290 policy.handler.wait_for_blocker(refreshed)
291 raise Exception("Should have been rejected!")
292 except model.SafeException, ex:
293 assert "New interface's modification time is before old version" in str(ex)
295 # Must finish with the newest version
296 self.assertEquals(1209206132, iface_cache.iface_cache._get_signature_date(iface.uri))
297 finally:
298 sys.stdout = old_out
300 def testBackground(self, verbose = False):
301 p = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml')
302 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
303 p.freshness = 0
304 p.network_use = model.network_minimal
305 p.solver.solve(p.root, arch.get_host_architecture())
306 assert p.ready
308 @tasks.async
309 def choose_download(registed_cb, nid, actions):
310 try:
311 assert actions == ['download', 'Download'], actions
312 registed_cb(nid, 'download')
313 except:
314 import traceback
315 traceback.print_exc()
316 yield None
318 global ran_gui
319 ran_gui = False
320 old_out = sys.stdout
321 try:
322 sys.stdout = StringIO()
323 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
324 my_dbus.user_callback = choose_download
325 pid = os.getpid()
326 old_exit = os._exit
327 def my_exit(code):
328 # The background handler runs in the same process
329 # as the tests, so don't let it abort.
330 if os.getpid() == pid:
331 raise SystemExit(code)
332 # But, child download processes are OK
333 old_exit(code)
334 try:
335 try:
336 os._exit = my_exit
337 background.spawn_background_update(p, verbose)
338 assert False
339 except SystemExit, ex:
340 self.assertEquals(1, ex.code)
341 finally:
342 os._exit = old_exit
343 finally:
344 sys.stdout = old_out
345 assert ran_gui
347 def testBackgroundVerbose(self):
348 self.testBackground(verbose = True)
350 suite = unittest.makeSuite(TestDownload)
351 if __name__ == '__main__':
352 sys.argv.append('-v')
353 unittest.main()