2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
15 from zeroinstall
.injector
.requirements
import Requirements
16 from zeroinstall
.injector
.driver
import Driver
17 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
18 from zeroinstall
.support
import basedir
, tasks
19 from zeroinstall
.injector
import fetch
29 background
._detach
= lambda: False
30 background
._exec
_gui
= raise_gui
33 def output_suppressed():
34 old_stdout
= sys
.stdout
35 old_stderr
= sys
.stderr
37 sys
.stdout
= StringIO()
38 sys
.stderr
= StringIO()
43 except BaseException
as ex
:
44 # Don't abort unit-tests if someone raises SystemExit
45 raise Exception(str(type(ex
)) + " " + str(ex
))
47 sys
.stdout
= old_stdout
48 sys
.stderr
= old_stderr
51 def __init__(self
, reply
):
57 def download_and_execute(driver
, prog_args
, main
= None):
58 downloaded
= driver
.solve_and_download_impls()
60 tasks
.wait_for_blocker(downloaded
)
61 run
.execute_selections(driver
.solver
.selections
, prog_args
, stores
= driver
.config
.stores
, main
= main
)
65 return 3 # NM_STATUS_CONNECTED
68 def kill_server_process():
70 if server_process
is not None:
71 os
.kill(server_process
, signal
.SIGTERM
)
72 os
.waitpid(server_process
, 0)
75 def run_server(*args
):
77 assert server_process
is None
78 server_process
= server
.handle_requests(*args
)
80 class TestDownload(BaseTest
):
84 self
.config
.handler
.allow_downloads
= True
85 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
87 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
89 stream
= tempfile
.TemporaryFile()
90 stream
.write(data
.thomas_key
)
92 gpg
.import_key(stream
)
95 trust
.trust_db
.watchers
= []
98 BaseTest
.tearDown(self
)
101 def testRejectKey(self
):
102 with
output_suppressed():
103 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
104 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
105 assert driver
.need_download()
106 sys
.stdin
= Reply("N\n")
108 download_and_execute(driver
, ['Hello'])
110 except model
.SafeException
as ex
:
111 if "has no usable implementations" not in str(ex
):
113 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
115 self
.config
.handler
.ex
= None
117 def testRejectKeyXML(self
):
118 with
output_suppressed():
119 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
120 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
121 assert driver
.need_download()
122 sys
.stdin
= Reply("N\n")
124 download_and_execute(driver
, ['Hello'])
126 except model
.SafeException
as ex
:
127 if "has no usable implementations" not in str(ex
):
129 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
131 self
.config
.handler
.ex
= None
133 def testImport(self
):
134 from zeroinstall
.injector
import cli
136 rootLogger
= getLogger()
137 rootLogger
.disabled
= True
140 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
142 except model
.SafeException
as ex
:
143 assert 'NO-SUCH-FILE' in str(ex
)
145 rootLogger
.disabled
= False
146 rootLogger
.setLevel(WARN
)
148 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
149 self
.assertEqual(None, hello
)
151 with
output_suppressed():
152 run_server('6FCF121BE2390E0B.gpg')
153 sys
.stdin
= Reply("Y\n")
155 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 cli
.main(['--import', 'Hello'], config
= self
.config
)
157 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
159 # Check we imported the interface after trusting the key
160 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
161 self
.assertEqual(1, len(hello
.implementations
))
163 self
.assertEqual(None, hello
.local_path
)
165 # Shouldn't need to prompt the second time
167 cli
.main(['--import', 'Hello'], config
= self
.config
)
169 def testSelections(self
):
170 from zeroinstall
.injector
import cli
171 root
= qdom
.parse(open("selections.xml"))
172 sels
= selections
.Selections(root
)
173 class Options
: dry_run
= False
175 with
output_suppressed():
176 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
177 sys
.stdin
= Reply("Y\n")
179 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
183 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
184 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
185 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
187 assert sels
.download_missing(self
.config
) is None
189 def testHelpers(self
):
190 from zeroinstall
import helpers
192 with
output_suppressed():
193 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
194 sys
.stdin
= Reply("Y\n")
195 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
196 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
197 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
198 assert sels
.download_missing(self
.config
) is None
200 def testSelectionsWithFeed(self
):
201 from zeroinstall
.injector
import cli
202 root
= qdom
.parse(open("selections.xml"))
203 sels
= selections
.Selections(root
)
205 with
output_suppressed():
206 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
207 sys
.stdin
= Reply("Y\n")
209 tasks
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
211 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
212 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
213 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
215 assert sels
.download_missing(self
.config
) is None
217 def testAcceptKey(self
):
218 with
output_suppressed():
219 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
220 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
221 assert driver
.need_download()
222 sys
.stdin
= Reply("Y\n")
224 download_and_execute(driver
, ['Hello'], main
= 'Missing')
226 except model
.SafeException
as ex
:
227 if "HelloWorld/Missing" not in str(ex
):
230 def testAutoAcceptKey(self
):
231 self
.config
.auto_approve_keys
= True
232 with
output_suppressed():
233 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
234 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
235 assert driver
.need_download()
236 sys
.stdin
= Reply("")
238 download_and_execute(driver
, ['Hello'], main
= 'Missing')
240 except model
.SafeException
as ex
:
241 if "HelloWorld/Missing" not in str(ex
):
244 def testDistro(self
):
245 with
output_suppressed():
246 native_url
= 'http://example.com:8000/Native.xml'
248 # Initially, we don't have the feed at all...
249 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
250 assert master_feed
is None, master_feed
252 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
253 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
254 driver
= Driver(requirements
= Requirements(native_url
), config
= self
.config
)
255 assert driver
.need_download()
257 solve
= driver
.solve_with_downloads()
258 tasks
.wait_for_blocker(solve
)
261 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
262 assert master_feed
is not None
263 assert master_feed
.implementations
== {}
265 distro_feed_url
= master_feed
.get_distro_feed()
266 assert distro_feed_url
is not None
267 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
268 assert distro_feed
is not None
269 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
271 def testWrongSize(self
):
272 with
output_suppressed():
273 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
274 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
275 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello-wrong-size'), config
= self
.config
)
276 assert driver
.need_download()
277 sys
.stdin
= Reply("Y\n")
279 download_and_execute(driver
, ['Hello'], main
= 'Missing')
281 except model
.SafeException
as ex
:
282 if "Downloaded archive has incorrect size" not in str(ex
):
285 def testRecipe(self
):
288 sys
.stdout
= StringIO()
289 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
290 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
292 download_and_execute(driver
, [])
294 except model
.SafeException
as ex
:
295 if "HelloWorld/Missing" not in str(ex
):
300 def testSymlink(self
):
303 sys
.stdout
= StringIO()
304 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
305 driver
= Driver(requirements
= Requirements(os
.path
.abspath('RecipeSymlink.xml')), config
= self
.config
)
307 download_and_execute(driver
, [])
309 except model
.SafeException
as ex
:
310 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
312 self
.assertEqual(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
316 def testAutopackage(self
):
319 sys
.stdout
= StringIO()
320 run_server('HelloWorld.autopackage')
321 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Autopackage.xml')), config
= self
.config
)
323 download_and_execute(driver
, [])
325 except model
.SafeException
as ex
:
326 if "HelloWorld/Missing" not in str(ex
):
331 def testRecipeFailure(self
):
334 sys
.stdout
= StringIO()
336 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
338 download_and_execute(driver
, [])
340 except download
.DownloadError
as ex
:
341 if "Connection" not in str(ex
):
346 def testMirrors(self
):
349 sys
.stdout
= StringIO()
350 getLogger().setLevel(ERROR
)
351 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
352 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
353 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
354 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
356 refreshed
= driver
.solve_with_downloads()
357 tasks
.wait_for_blocker(refreshed
)
358 assert driver
.solver
.ready
362 def testReplay(self
):
365 sys
.stdout
= StringIO()
366 getLogger().setLevel(ERROR
)
367 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
368 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
369 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, open('Hello-new.xml').read(), mtime
+ 10000)
371 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
372 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
373 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
374 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
376 # Update from mirror (should ignore out-of-date timestamp)
377 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
378 tasks
.wait_for_blocker(refreshed
)
380 # Update from upstream (should report an error)
381 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
383 tasks
.wait_for_blocker(refreshed
)
384 raise Exception("Should have been rejected!")
385 except model
.SafeException
as ex
:
386 assert "New feed's modification time is before old version" in str(ex
)
388 # Must finish with the newest version
389 self
.assertEqual(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
393 def testBackground(self
, verbose
= False):
394 r
= Requirements('http://example.com:8000/Hello.xml')
395 d
= Driver(requirements
= r
, config
= self
.config
)
396 self
.import_feed(r
.interface_uri
, 'Hello.xml')
397 self
.config
.freshness
= 0
398 self
.config
.network_use
= model
.network_minimal
399 d
.solver
.solve(r
.interface_uri
, arch
.get_host_architecture())
400 assert d
.solver
.ready
, d
.solver
.get_failure_reason()
403 def choose_download(registed_cb
, nid
, actions
):
405 assert actions
== ['download', 'Download'], actions
406 registed_cb(nid
, 'download')
409 traceback
.print_exc()
416 sys
.stdout
= StringIO()
417 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
418 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
419 my_dbus
.user_callback
= choose_download
423 # The background handler runs in the same process
424 # as the tests, so don't let it abort.
425 if os
.getpid() == pid
:
426 raise SystemExit(code
)
427 # But, child download processes are OK
429 from zeroinstall
.injector
import config
430 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
431 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
435 background
.spawn_background_update(d
, verbose
)
437 except SystemExit as ex
:
438 self
.assertEqual(1, ex
.code
)
441 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
446 def testBackgroundVerbose(self
):
447 self
.testBackground(verbose
= True)
449 if __name__
== '__main__':
453 kill_server_process()