2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
15 from zeroinstall
.injector
.policy
import Policy
16 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
17 from zeroinstall
.support
import basedir
, tasks
18 from zeroinstall
.injector
import fetch
28 background
._detach
= lambda: False
29 background
._exec
_gui
= raise_gui
32 def output_suppressed():
33 old_stdout
= sys
.stdout
34 old_stderr
= sys
.stderr
36 sys
.stdout
= StringIO()
37 sys
.stderr
= StringIO()
42 except BaseException
as ex
:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex
)) + " " + str(ex
))
46 sys
.stdout
= old_stdout
47 sys
.stderr
= old_stderr
50 def __init__(self
, reply
):
56 def download_and_execute(policy
, prog_args
, main
= None):
57 downloaded
= policy
.solve_and_download_impls()
59 tasks
.wait_for_blocker(downloaded
)
60 run
.execute_selections(policy
.solver
.selections
, prog_args
, stores
= policy
.config
.stores
, main
= main
)
64 return 3 # NM_STATUS_CONNECTED
67 def kill_server_process():
69 if server_process
is not None:
70 os
.kill(server_process
, signal
.SIGTERM
)
71 os
.waitpid(server_process
, 0)
74 def run_server(*args
):
76 assert server_process
is None
77 server_process
= server
.handle_requests(*args
)
79 class TestDownload(BaseTest
):
83 self
.config
.handler
.allow_downloads
= True
84 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
86 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
88 stream
= tempfile
.TemporaryFile()
89 stream
.write(data
.thomas_key
)
91 gpg
.import_key(stream
)
94 trust
.trust_db
.watchers
= []
97 BaseTest
.tearDown(self
)
100 def testRejectKey(self
):
101 with
output_suppressed():
102 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
103 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
104 assert policy
.need_download()
105 sys
.stdin
= Reply("N\n")
107 download_and_execute(policy
, ['Hello'])
109 except model
.SafeException
as ex
:
110 if "has no usable implementations" not in str(ex
):
112 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
114 self
.config
.handler
.ex
= None
116 def testRejectKeyXML(self
):
117 with
output_suppressed():
118 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
119 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
120 assert policy
.need_download()
121 sys
.stdin
= Reply("N\n")
123 download_and_execute(policy
, ['Hello'])
125 except model
.SafeException
as ex
:
126 if "has no usable implementations" not in str(ex
):
128 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
130 self
.config
.handler
.ex
= None
132 def testImport(self
):
133 from zeroinstall
.injector
import cli
135 rootLogger
= getLogger()
136 rootLogger
.disabled
= True
139 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
141 except model
.SafeException
as ex
:
142 assert 'NO-SUCH-FILE' in str(ex
)
144 rootLogger
.disabled
= False
145 rootLogger
.setLevel(WARN
)
147 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
148 self
.assertEqual(None, hello
)
150 with
output_suppressed():
151 run_server('6FCF121BE2390E0B.gpg')
152 sys
.stdin
= Reply("Y\n")
154 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
155 cli
.main(['--import', 'Hello'], config
= self
.config
)
156 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
158 # Check we imported the interface after trusting the key
159 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
160 self
.assertEqual(1, len(hello
.implementations
))
162 self
.assertEqual(None, hello
.local_path
)
164 # Shouldn't need to prompt the second time
166 cli
.main(['--import', 'Hello'], config
= self
.config
)
168 def testSelections(self
):
169 from zeroinstall
.injector
import cli
170 root
= qdom
.parse(open("selections.xml"))
171 sels
= selections
.Selections(root
)
172 class Options
: dry_run
= False
174 with
output_suppressed():
175 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
176 sys
.stdin
= Reply("Y\n")
178 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
182 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
183 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
184 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
186 assert sels
.download_missing(self
.config
) is None
188 def testHelpers(self
):
189 from zeroinstall
import helpers
191 with
output_suppressed():
192 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
193 sys
.stdin
= Reply("Y\n")
194 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
195 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
196 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
197 assert sels
.download_missing(self
.config
) is None
199 def testSelectionsWithFeed(self
):
200 from zeroinstall
.injector
import cli
201 root
= qdom
.parse(open("selections.xml"))
202 sels
= selections
.Selections(root
)
204 with
output_suppressed():
205 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
206 sys
.stdin
= Reply("Y\n")
208 self
.config
.handler
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
210 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
211 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
212 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
214 assert sels
.download_missing(self
.config
) is None
216 def testAcceptKey(self
):
217 with
output_suppressed():
218 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
219 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
220 assert policy
.need_download()
221 sys
.stdin
= Reply("Y\n")
223 download_and_execute(policy
, ['Hello'], main
= 'Missing')
225 except model
.SafeException
as ex
:
226 if "HelloWorld/Missing" not in str(ex
):
229 def testAutoAcceptKey(self
):
230 self
.config
.auto_approve_keys
= True
231 with
output_suppressed():
232 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
233 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
234 assert policy
.need_download()
235 sys
.stdin
= Reply("")
237 download_and_execute(policy
, ['Hello'], main
= 'Missing')
239 except model
.SafeException
as ex
:
240 if "HelloWorld/Missing" not in str(ex
):
243 def testDistro(self
):
244 with
output_suppressed():
245 native_url
= 'http://example.com:8000/Native.xml'
247 # Initially, we don't have the feed at all...
248 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
249 assert master_feed
is None, master_feed
251 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
252 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
253 policy
= Policy(native_url
, config
= self
.config
)
254 assert policy
.need_download()
256 solve
= policy
.solve_with_downloads()
257 self
.config
.handler
.wait_for_blocker(solve
)
260 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
261 assert master_feed
is not None
262 assert master_feed
.implementations
== {}
264 distro_feed_url
= master_feed
.get_distro_feed()
265 assert distro_feed_url
is not None
266 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
267 assert distro_feed
is not None
268 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
270 def testWrongSize(self
):
271 with
output_suppressed():
272 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
273 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
274 policy
= Policy('http://localhost:8000/Hello-wrong-size', config
= self
.config
)
275 assert policy
.need_download()
276 sys
.stdin
= Reply("Y\n")
278 download_and_execute(policy
, ['Hello'], main
= 'Missing')
280 except model
.SafeException
as ex
:
281 if "Downloaded archive has incorrect size" not in str(ex
):
284 def testRecipe(self
):
287 sys
.stdout
= StringIO()
288 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
289 policy
= Policy(os
.path
.abspath('Recipe.xml'), config
= self
.config
)
291 download_and_execute(policy
, [])
293 except model
.SafeException
as ex
:
294 if "HelloWorld/Missing" not in str(ex
):
299 def testSymlink(self
):
302 sys
.stdout
= StringIO()
303 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
304 policy
= Policy(os
.path
.abspath('RecipeSymlink.xml'), config
= self
.config
)
306 download_and_execute(policy
, [])
308 except model
.SafeException
as ex
:
309 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
311 self
.assertEqual(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
315 def testAutopackage(self
):
318 sys
.stdout
= StringIO()
319 run_server('HelloWorld.autopackage')
320 policy
= Policy(os
.path
.abspath('Autopackage.xml'), config
= self
.config
)
322 download_and_execute(policy
, [])
324 except model
.SafeException
as ex
:
325 if "HelloWorld/Missing" not in str(ex
):
330 def testRecipeFailure(self
):
333 sys
.stdout
= StringIO()
335 policy
= Policy(os
.path
.abspath('Recipe.xml'), config
= self
.config
)
337 download_and_execute(policy
, [])
339 except download
.DownloadError
as ex
:
340 if "Connection" not in str(ex
):
345 def testMirrors(self
):
348 sys
.stdout
= StringIO()
349 getLogger().setLevel(ERROR
)
350 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
351 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
352 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
353 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
355 refreshed
= policy
.solve_with_downloads()
356 policy
.handler
.wait_for_blocker(refreshed
)
361 def testReplay(self
):
364 sys
.stdout
= StringIO()
365 getLogger().setLevel(ERROR
)
366 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
367 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
368 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, open('Hello-new.xml').read(), mtime
+ 10000)
370 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
371 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
372 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
373 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
375 # Update from mirror (should ignore out-of-date timestamp)
376 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
377 policy
.handler
.wait_for_blocker(refreshed
)
379 # Update from upstream (should report an error)
380 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
382 policy
.handler
.wait_for_blocker(refreshed
)
383 raise Exception("Should have been rejected!")
384 except model
.SafeException
as ex
:
385 assert "New feed's modification time is before old version" in str(ex
)
387 # Must finish with the newest version
388 self
.assertEqual(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
392 def testBackground(self
, verbose
= False):
393 p
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
394 self
.import_feed(p
.root
, 'Hello.xml')
396 p
.network_use
= model
.network_minimal
397 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
398 assert p
.ready
, p
.solver
.get_failure_reason()
401 def choose_download(registed_cb
, nid
, actions
):
403 assert actions
== ['download', 'Download'], actions
404 registed_cb(nid
, 'download')
407 traceback
.print_exc()
414 sys
.stdout
= StringIO()
415 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
416 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
417 my_dbus
.user_callback
= choose_download
421 # The background handler runs in the same process
422 # as the tests, so don't let it abort.
423 if os
.getpid() == pid
:
424 raise SystemExit(code
)
425 # But, child download processes are OK
427 from zeroinstall
.injector
import config
428 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
429 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
433 background
.spawn_background_update(p
, verbose
)
435 except SystemExit as ex
:
436 self
.assertEqual(1, ex
.code
)
439 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
444 def testBackgroundVerbose(self
):
445 self
.testBackground(verbose
= True)
447 if __name__
== '__main__':
451 kill_server_process()