Check network status with NetworkManager before doing background updates.
[zeroinstall/zeroinstall-mseaborn.git] / tests / testdownload.py
blobbffefdb1426c879519499e9bae9873bf9c45d185
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, DEBUG, INFO, WARN, ERROR
8 sys.path.insert(0, '..')
10 from zeroinstall.injector import model, autopolicy, gpg, iface_cache, download, reader, trust, handler, background, arch
11 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
12 from zeroinstall.support import basedir, tasks
13 import data
14 import my_dbus
16 import server
18 ran_gui = False
19 def raise_gui(*args):
20 global ran_gui
21 ran_gui = True
22 background._detach = lambda: False
23 background._exec_gui = raise_gui
24 sys.modules['dbus'] = my_dbus
25 sys.modules['dbus.glib'] = my_dbus
26 my_dbus.types = my_dbus
27 sys.modules['dbus.types'] = my_dbus
29 class Reply:
30 def __init__(self, reply):
31 self.reply = reply
33 def readline(self):
34 return self.reply
36 class DummyHandler(handler.Handler):
37 __slots__ = ['ex', 'tb']
39 def __init__(self):
40 handler.Handler.__init__(self)
41 self.ex = None
43 def wait_for_blocker(self, blocker):
44 self.ex = None
45 handler.Handler.wait_for_blocker(self, blocker)
46 if self.ex:
47 raise self.ex, None, self.tb
49 def report_error(self, ex, tb = None):
50 assert self.ex is None, self.ex
51 self.ex = ex
52 self.tb = tb
54 #import traceback
55 #traceback.print_exc()
57 class TestDownload(BaseTest):
58 def setUp(self):
59 BaseTest.setUp(self)
61 stream = tempfile.TemporaryFile()
62 stream.write(data.thomas_key)
63 stream.seek(0)
64 gpg.import_key(stream)
65 self.child = None
67 trust.trust_db.watchers = []
69 def tearDown(self):
70 BaseTest.tearDown(self)
71 if self.child is not None:
72 os.kill(self.child, signal.SIGTERM)
73 os.waitpid(self.child, 0)
74 self.child = None
76 def testRejectKey(self):
77 old_out = sys.stdout
78 try:
79 sys.stdout = StringIO()
80 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
81 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
82 handler = DummyHandler())
83 assert policy.need_download()
84 sys.stdin = Reply("N\n")
85 try:
86 policy.download_and_execute(['Hello'])
87 assert 0
88 except model.SafeException, ex:
89 if "Not signed with a trusted key" not in str(ex):
90 raise ex
91 finally:
92 sys.stdout = old_out
94 def testRejectKeyXML(self):
95 old_out = sys.stdout
96 try:
97 sys.stdout = StringIO()
98 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
99 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
100 handler = DummyHandler())
101 assert policy.need_download()
102 sys.stdin = Reply("N\n")
103 try:
104 policy.download_and_execute(['Hello'])
105 assert 0
106 except model.SafeException, ex:
107 if "Not signed with a trusted key" not in str(ex):
108 raise
109 finally:
110 sys.stdout = old_out
112 def testImport(self):
113 old_out = sys.stdout
114 try:
115 from zeroinstall.injector import cli
116 import logging
118 rootLogger = getLogger()
119 rootLogger.disabled = True
120 try:
121 try:
122 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
123 assert 0
124 except model.SafeException, ex:
125 assert 'NO-SUCH-FILE' in str(ex)
126 finally:
127 rootLogger.disabled = False
128 rootLogger.setLevel(WARN)
130 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
131 self.assertEquals(0, len(hello.implementations))
133 sys.stdout = StringIO()
134 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
135 sys.stdin = Reply("Y\n")
137 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
138 cli.main(['--import', 'Hello'])
139 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
141 # Check we imported the interface after trusting the key
142 reader.update_from_cache(hello)
143 self.assertEquals(1, len(hello.implementations))
145 # Shouldn't need to prompt the second time
146 sys.stdin = None
147 cli.main(['--import', 'Hello'])
148 finally:
149 sys.stdout = old_out
151 def testAcceptKey(self):
152 old_out = sys.stdout
153 try:
154 sys.stdout = StringIO()
155 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
156 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
157 handler = DummyHandler())
158 assert policy.need_download()
159 sys.stdin = Reply("Y\n")
160 try:
161 policy.download_and_execute(['Hello'], main = 'Missing')
162 assert 0
163 except model.SafeException, ex:
164 if "HelloWorld/Missing" not in str(ex):
165 raise ex
166 finally:
167 sys.stdout = old_out
169 def testRecipe(self):
170 old_out = sys.stdout
171 try:
172 sys.stdout = StringIO()
173 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
174 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
175 try:
176 policy.download_and_execute([])
177 assert False
178 except model.SafeException, ex:
179 if "HelloWorld/Missing" not in str(ex):
180 raise ex
181 finally:
182 sys.stdout = old_out
184 def testSymlink(self):
185 old_out = sys.stdout
186 try:
187 sys.stdout = StringIO()
188 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
189 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
190 handler = DummyHandler())
191 try:
192 policy.download_and_execute([])
193 assert False
194 except model.SafeException, ex:
195 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
196 raise ex
197 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
198 finally:
199 sys.stdout = old_out
201 def testAutopackage(self):
202 old_out = sys.stdout
203 try:
204 sys.stdout = StringIO()
205 self.child = server.handle_requests('HelloWorld.autopackage')
206 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
207 try:
208 policy.download_and_execute([])
209 assert False
210 except model.SafeException, ex:
211 if "HelloWorld/Missing" not in str(ex):
212 raise
213 finally:
214 sys.stdout = old_out
216 def testRecipeFailure(self):
217 old_out = sys.stdout
218 try:
219 sys.stdout = StringIO()
220 self.child = server.handle_requests('*')
221 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
222 handler = DummyHandler())
223 try:
224 policy.download_and_execute([])
225 assert False
226 except download.DownloadError, ex:
227 if "Connection" not in str(ex):
228 raise
229 finally:
230 sys.stdout = old_out
232 def testMirrors(self):
233 old_out = sys.stdout
234 try:
235 sys.stdout = StringIO()
236 getLogger().setLevel(ERROR)
237 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
238 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
239 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
240 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
242 refreshed = policy.solve_with_downloads()
243 policy.handler.wait_for_blocker(refreshed)
244 assert policy.ready
245 finally:
246 sys.stdout = old_out
248 def testReplay(self):
249 old_out = sys.stdout
250 try:
251 sys.stdout = StringIO()
252 getLogger().setLevel(ERROR)
253 iface = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello.xml')
254 mtime = int(os.stat('Hello-new.xml').st_mtime)
255 iface_cache.iface_cache.update_interface_from_network(iface, file('Hello-new.xml').read(), mtime + 10000)
257 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
258 self.child = server.handle_requests(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
259 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False)
260 policy.fetcher.feed_mirror = 'http://localhost:8000/0mirror'
262 # Update from mirror (should ignore out-of-date timestamp)
263 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
264 policy.handler.wait_for_blocker(refreshed)
266 # Update from upstream (should report an error)
267 refreshed = policy.fetcher.download_and_import_feed(iface.uri, iface_cache.iface_cache)
268 try:
269 policy.handler.wait_for_blocker(refreshed)
270 raise Exception("Should have been rejected!")
271 except model.SafeException, ex:
272 assert "New interface's modification time is before old version" in str(ex)
274 # Must finish with the newest version
275 self.assertEquals(1209206132, iface_cache.iface_cache._get_signature_date(iface.uri))
276 finally:
277 sys.stdout = old_out
279 def testBackground(self, verbose = False):
280 p = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml')
281 reader.update(iface_cache.iface_cache.get_interface(p.root), 'Hello.xml')
282 p.freshness = 0
283 p.network_use = model.network_minimal
284 p.solver.solve(p.root, arch.get_host_architecture())
285 assert p.ready
287 @tasks.async
288 def choose_download(registed_cb, nid, actions):
289 try:
290 assert actions == ['download', 'Download'], actions
291 registed_cb(nid, 'download')
292 except:
293 import traceback
294 traceback.print_exc()
295 yield None
297 global ran_gui
298 ran_gui = False
299 old_out = sys.stdout
300 try:
301 sys.stdout = StringIO()
302 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
303 my_dbus.user_callback = choose_download
304 pid = os.getpid()
305 old_exit = os._exit
306 def my_exit(code):
307 # The background handler runs in the same process
308 # as the tests, so don't let it abort.
309 if os.getpid() == pid:
310 raise SystemExit(code)
311 # But, child download processes are OK
312 old_exit(code)
313 try:
314 try:
315 os._exit = my_exit
316 background.spawn_background_update(p, verbose)
317 assert False
318 except SystemExit, ex:
319 self.assertEquals(1, ex.code)
320 finally:
321 os._exit = old_exit
322 finally:
323 sys.stdout = old_out
324 assert ran_gui
326 def testBackgroundVerbose(self):
327 self.testBackground(verbose = True)
329 suite = unittest.makeSuite(TestDownload)
330 if __name__ == '__main__':
331 sys.argv.append('-v')
332 unittest.main()