Include arch in implementation ID for all distribution packages
[zeroinstall/zeroinstall-afb.git] / tests / testdownload.py
blobae8dbd773392baf93cab7e8b0b382e9353ff9483
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest
4 import sys, tempfile, os
5 from StringIO import StringIO
6 import unittest, signal
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch, selections, qdom
15 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
16 from zeroinstall.support import basedir, tasks
17 from zeroinstall.injector import fetch
18 import data
19 import my_dbus
21 fetch.DEFAULT_KEY_LOOKUP_SERVER = 'http://localhost:3333/key-info'
23 import server
25 ran_gui = False
26 def raise_gui(*args):
27 global ran_gui
28 ran_gui = True
29 background._detach = lambda: False
30 background._exec_gui = raise_gui
32 @contextmanager
33 def output_suppressed():
34 old_stdout = sys.stdout
35 old_stderr = sys.stderr
36 try:
37 sys.stdout = StringIO()
38 sys.stderr = StringIO()
39 yield
40 finally:
41 sys.stdout = old_stdout
42 sys.stderr = old_stderr
44 class Reply:
45 def __init__(self, reply):
46 self.reply = reply
48 def readline(self):
49 return self.reply
51 class DummyHandler(handler.Handler):
52 __slots__ = ['ex', 'tb']
54 def __init__(self):
55 handler.Handler.__init__(self)
56 self.ex = None
58 def wait_for_blocker(self, blocker):
59 self.ex = None
60 handler.Handler.wait_for_blocker(self, blocker)
61 if self.ex:
62 raise self.ex, None, self.tb
64 def report_error(self, ex, tb = None):
65 assert self.ex is None, self.ex
66 self.ex = ex
67 self.tb = tb
69 #import traceback
70 #traceback.print_exc()
72 class NetworkManager:
73 def state(self):
74 return 3 # NM_STATUS_CONNECTED
76 class TestDownload(BaseTest):
77 def setUp(self):
78 BaseTest.setUp(self)
80 stream = tempfile.TemporaryFile()
81 stream.write(data.thomas_key)
82 stream.seek(0)
83 gpg.import_key(stream)
84 self.child = None
86 trust.trust_db.watchers = []
88 def tearDown(self):
89 BaseTest.tearDown(self)
90 if self.child is not None:
91 os.kill(self.child, signal.SIGTERM)
92 os.waitpid(self.child, 0)
93 self.child = None
95 def testRejectKey(self):
96 with output_suppressed():
97 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
98 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
99 handler = DummyHandler())
100 assert policy.need_download()
101 sys.stdin = Reply("N\n")
102 try:
103 policy.download_and_execute(['Hello'])
104 assert 0
105 except model.SafeException, ex:
106 if "Can't find all required implementations" not in str(ex):
107 raise ex
108 if "Not signed with a trusted key" not in str(policy.handler.ex):
109 raise ex
111 def testRejectKeyXML(self):
112 with output_suppressed():
113 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
114 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False,
115 handler = DummyHandler())
116 assert policy.need_download()
117 sys.stdin = Reply("N\n")
118 try:
119 policy.download_and_execute(['Hello'])
120 assert 0
121 except model.SafeException, ex:
122 if "Can't find all required implementations" not in str(ex):
123 raise ex
124 if "Not signed with a trusted key" not in str(policy.handler.ex):
125 raise
127 def testImport(self):
128 from zeroinstall.injector import cli
130 rootLogger = getLogger()
131 rootLogger.disabled = True
132 try:
133 try:
134 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
135 assert 0
136 except model.SafeException, ex:
137 assert 'NO-SUCH-FILE' in str(ex)
138 finally:
139 rootLogger.disabled = False
140 rootLogger.setLevel(WARN)
142 hello = iface_cache.iface_cache.get_feed('http://localhost:8000/Hello')
143 self.assertEquals(None, hello)
145 with output_suppressed():
146 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
147 sys.stdin = Reply("Y\n")
149 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
150 cli.main(['--import', 'Hello'])
151 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
153 # Check we imported the interface after trusting the key
154 hello = iface_cache.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
155 self.assertEquals(1, len(hello.implementations))
157 # Shouldn't need to prompt the second time
158 sys.stdin = None
159 cli.main(['--import', 'Hello'])
161 def testSelections(self):
162 from zeroinstall.injector.cli import _download_missing_selections
163 root = qdom.parse(file("selections.xml"))
164 sels = selections.Selections(root)
165 class Options: dry_run = False
167 with output_suppressed():
168 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
169 sys.stdin = Reply("Y\n")
170 _download_missing_selections(Options(), sels)
171 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
172 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
174 assert sels.download_missing(iface_cache.iface_cache, None) is None
176 def testSelectionsWithFeed(self):
177 from zeroinstall.injector.cli import _download_missing_selections
178 root = qdom.parse(file("selections.xml"))
179 sels = selections.Selections(root)
180 class Options: dry_run = False
182 with output_suppressed():
183 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
184 sys.stdin = Reply("Y\n")
186 from zeroinstall.injector.handler import Handler
187 handler = Handler()
188 fetcher = fetch.Fetcher(handler)
189 handler.wait_for_blocker(fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache.iface_cache))
191 _download_missing_selections(Options(), sels)
192 path = iface_cache.iface_cache.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
193 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
195 assert sels.download_missing(iface_cache.iface_cache, None) is None
197 def testAcceptKey(self):
198 with output_suppressed():
199 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
200 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
201 handler = DummyHandler())
202 assert policy.need_download()
203 sys.stdin = Reply("Y\n")
204 try:
205 policy.download_and_execute(['Hello'], main = 'Missing')
206 assert 0
207 except model.SafeException, ex:
208 if "HelloWorld/Missing" not in str(ex):
209 raise ex
211 def testDistro(self):
212 with output_suppressed():
213 native_url = 'http://example.com:8000/Native.xml'
215 # Initially, we don't have the feed at all...
216 master_feed = iface_cache.iface_cache.get_feed(native_url)
217 assert master_feed is None, master_feed
219 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
220 self.child = server.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
221 h = DummyHandler()
222 policy = autopolicy.AutoPolicy(native_url, download_only = False, handler = h)
223 assert policy.need_download()
225 solve = policy.solve_with_downloads()
226 h.wait_for_blocker(solve)
227 tasks.check(solve)
229 master_feed = iface_cache.iface_cache.get_feed(native_url)
230 assert master_feed is not None
231 assert master_feed.implementations == {}
233 distro_feed_url = master_feed.get_distro_feed()
234 assert distro_feed_url is not None
235 distro_feed = iface_cache.iface_cache.get_feed(distro_feed_url)
236 assert distro_feed is not None
237 assert len(distro_feed.implementations) == 2, distro_feed.implementations
239 def testWrongSize(self):
240 with output_suppressed():
241 self.child = server.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
242 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
243 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello-wrong-size', download_only = False,
244 handler = DummyHandler())
245 assert policy.need_download()
246 sys.stdin = Reply("Y\n")
247 try:
248 policy.download_and_execute(['Hello'], main = 'Missing')
249 assert 0
250 except model.SafeException, ex:
251 if "Downloaded archive has incorrect size" not in str(ex):
252 raise ex
254 def testRecipe(self):
255 old_out = sys.stdout
256 try:
257 sys.stdout = StringIO()
258 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
259 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
260 try:
261 policy.download_and_execute([])
262 assert False
263 except model.SafeException, ex:
264 if "HelloWorld/Missing" not in str(ex):
265 raise ex
266 finally:
267 sys.stdout = old_out
269 def testSymlink(self):
270 old_out = sys.stdout
271 try:
272 sys.stdout = StringIO()
273 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
274 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
275 handler = DummyHandler())
276 try:
277 policy.download_and_execute([])
278 assert False
279 except model.SafeException, ex:
280 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
281 raise ex
282 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
283 finally:
284 sys.stdout = old_out
286 def testAutopackage(self):
287 old_out = sys.stdout
288 try:
289 sys.stdout = StringIO()
290 self.child = server.handle_requests('HelloWorld.autopackage')
291 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
292 try:
293 policy.download_and_execute([])
294 assert False
295 except model.SafeException, ex:
296 if "HelloWorld/Missing" not in str(ex):
297 raise
298 finally:
299 sys.stdout = old_out
301 def testRecipeFailure(self):
302 old_out = sys.stdout
303 try:
304 sys.stdout = StringIO()
305 self.child = server.handle_requests('*')
306 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
307 handler = DummyHandler())
308 try:
309 policy.download_and_execute([])
310 assert False
311 except download.DownloadError, ex:
312 if "Connection" not in str(ex):
313 raise
314 finally:
315 sys.stdout = old_out
317 def testMirrors(self):
318 old_out = sys.stdout
319 try:
320 sys.stdout = StringIO()
321 getLogger().setLevel(ERROR)
322 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
323 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
324 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
325 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
327 refreshed = policy.solve_with_downloads()
328 policy.handler.wait_for_blocker(refreshed)
329 assert policy.ready
330 finally:
331 sys.stdout = old_out
333 def testReplay(self):
334 old_out = sys.stdout
335 try:
336 sys.stdout = StringIO()
337 getLogger().setLevel(ERROR)
338 iface = iface_cache.iface_cache.get_interface('http://example.com:8000/Hello.xml')
339 mtime = int(os.stat('Hello-new.xml').st_mtime)
340 iface_cache.iface_cache.update_feed_from_network(iface.uri, file('Hello-new.xml').read(), mtime + 10000)
342 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
343 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
344 policy = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml', download_only = False)
345 policy.fetcher.feed_mirror = 'http://example.com:8000/0mirror'
347 # Update from mirror (should ignore out-of-date timestamp)
348 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
349 policy.handler.wait_for_blocker(refreshed)
351 # Update from upstream (should report an error)
352 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
353 try:
354 policy.handler.wait_for_blocker(refreshed)
355 raise Exception("Should have been rejected!")
356 except model.SafeException, ex:
357 assert "New feed's modification time is before old version" in str(ex)
359 # Must finish with the newest version
360 self.assertEquals(1235911552, iface_cache.iface_cache._get_signature_date(iface.uri))
361 finally:
362 sys.stdout = old_out
364 def testBackground(self, verbose = False):
365 p = autopolicy.AutoPolicy('http://example.com:8000/Hello.xml')
366 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
367 p.freshness = 0
368 p.network_use = model.network_minimal
369 p.solver.solve(p.root, arch.get_host_architecture())
370 assert p.ready
372 @tasks.async
373 def choose_download(registed_cb, nid, actions):
374 try:
375 assert actions == ['download', 'Download'], actions
376 registed_cb(nid, 'download')
377 except:
378 import traceback
379 traceback.print_exc()
380 yield None
382 global ran_gui
383 ran_gui = False
384 old_out = sys.stdout
385 try:
386 sys.stdout = StringIO()
387 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
388 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
389 my_dbus.user_callback = choose_download
390 pid = os.getpid()
391 old_exit = os._exit
392 def my_exit(code):
393 # The background handler runs in the same process
394 # as the tests, so don't let it abort.
395 if os.getpid() == pid:
396 raise SystemExit(code)
397 # But, child download processes are OK
398 old_exit(code)
399 key_info = fetch.DEFAULT_KEY_LOOKUP_SERVER
400 fetch.DEFAULT_KEY_LOOKUP_SERVER = None
401 try:
402 try:
403 os._exit = my_exit
404 background.spawn_background_update(p, verbose)
405 assert False
406 except SystemExit, ex:
407 self.assertEquals(1, ex.code)
408 finally:
409 os._exit = old_exit
410 fetch.DEFAULT_KEY_LOOKUP_SERVER = key_info
411 finally:
412 sys.stdout = old_out
413 assert ran_gui
415 def testBackgroundVerbose(self):
416 self.testBackground(verbose = True)
418 if __name__ == '__main__':
419 unittest.main()