2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
15 from zeroinstall
.injector
.policy
import Policy
16 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
17 from zeroinstall
.support
import basedir
, tasks
18 from zeroinstall
.injector
import fetch
28 background
._detach
= lambda: False
29 background
._exec
_gui
= raise_gui
32 def output_suppressed():
33 old_stdout
= sys
.stdout
34 old_stderr
= sys
.stderr
36 sys
.stdout
= StringIO()
37 sys
.stderr
= StringIO()
42 except BaseException
as ex
:
43 # Don't abort unit-tests if someone raises SystemExit
44 raise Exception(str(type(ex
)) + " " + str(ex
))
46 sys
.stdout
= old_stdout
47 sys
.stderr
= old_stderr
50 def __init__(self
, reply
):
56 def download_and_execute(policy
, prog_args
, main
= None):
57 downloaded
= policy
.solve_and_download_impls()
59 policy
.config
.handler
.wait_for_blocker(downloaded
)
60 run
.execute_selections(policy
.solver
.selections
, prog_args
, stores
= policy
.config
.stores
, main
= main
)
64 return 3 # NM_STATUS_CONNECTED
67 def kill_server_process():
69 if server_process
is not None:
70 os
.kill(server_process
, signal
.SIGTERM
)
71 os
.waitpid(server_process
, 0)
74 def run_server(*args
):
76 assert server_process
is None
77 server_process
= server
.handle_requests(*args
)
79 class TestDownload(BaseTest
):
83 self
.config
.handler
.allow_downloads
= True
84 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
86 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
88 stream
= tempfile
.TemporaryFile()
89 stream
.write(data
.thomas_key
)
91 gpg
.import_key(stream
)
93 trust
.trust_db
.watchers
= []
96 BaseTest
.tearDown(self
)
99 def testRejectKey(self
):
100 with
output_suppressed():
101 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
102 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
103 assert policy
.need_download()
104 sys
.stdin
= Reply("N\n")
106 download_and_execute(policy
, ['Hello'])
108 except model
.SafeException
as ex
:
109 if "has no usable implementations" not in str(ex
):
111 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
113 self
.config
.handler
.ex
= None
115 def testRejectKeyXML(self
):
116 with
output_suppressed():
117 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
118 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
119 assert policy
.need_download()
120 sys
.stdin
= Reply("N\n")
122 download_and_execute(policy
, ['Hello'])
124 except model
.SafeException
as ex
:
125 if "has no usable implementations" not in str(ex
):
127 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
129 self
.config
.handler
.ex
= None
131 def testImport(self
):
132 from zeroinstall
.injector
import cli
134 rootLogger
= getLogger()
135 rootLogger
.disabled
= True
138 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
140 except model
.SafeException
as ex
:
141 assert 'NO-SUCH-FILE' in str(ex
)
143 rootLogger
.disabled
= False
144 rootLogger
.setLevel(WARN
)
146 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
147 self
.assertEquals(None, hello
)
149 with
output_suppressed():
150 run_server('6FCF121BE2390E0B.gpg')
151 sys
.stdin
= Reply("Y\n")
153 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
154 cli
.main(['--import', 'Hello'], config
= self
.config
)
155 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
157 # Check we imported the interface after trusting the key
158 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
159 self
.assertEquals(1, len(hello
.implementations
))
161 # Shouldn't need to prompt the second time
163 cli
.main(['--import', 'Hello'], config
= self
.config
)
165 def testSelections(self
):
166 from zeroinstall
.injector
import cli
167 root
= qdom
.parse(file("selections.xml"))
168 sels
= selections
.Selections(root
)
169 class Options
: dry_run
= False
171 with
output_suppressed():
172 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
173 sys
.stdin
= Reply("Y\n")
175 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
179 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
180 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
181 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
183 assert sels
.download_missing(self
.config
) is None
185 def testHelpers(self
):
186 from zeroinstall
import helpers
188 with
output_suppressed():
189 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
190 sys
.stdin
= Reply("Y\n")
191 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
192 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
193 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
194 assert sels
.download_missing(self
.config
) is None
196 def testSelectionsWithFeed(self
):
197 from zeroinstall
.injector
import cli
198 root
= qdom
.parse(file("selections.xml"))
199 sels
= selections
.Selections(root
)
201 with
output_suppressed():
202 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
203 sys
.stdin
= Reply("Y\n")
205 self
.config
.handler
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
207 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
208 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
209 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
211 assert sels
.download_missing(self
.config
) is None
213 def testAcceptKey(self
):
214 with
output_suppressed():
215 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
216 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
217 assert policy
.need_download()
218 sys
.stdin
= Reply("Y\n")
220 download_and_execute(policy
, ['Hello'], main
= 'Missing')
222 except model
.SafeException
as ex
:
223 if "HelloWorld/Missing" not in str(ex
):
226 def testAutoAcceptKey(self
):
227 self
.config
.auto_approve_keys
= True
228 with
output_suppressed():
229 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
230 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
231 assert policy
.need_download()
232 sys
.stdin
= Reply("")
234 download_and_execute(policy
, ['Hello'], main
= 'Missing')
236 except model
.SafeException
as ex
:
237 if "HelloWorld/Missing" not in str(ex
):
240 def testDistro(self
):
241 with
output_suppressed():
242 native_url
= 'http://example.com:8000/Native.xml'
244 # Initially, we don't have the feed at all...
245 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
246 assert master_feed
is None, master_feed
248 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
249 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
250 policy
= Policy(native_url
, config
= self
.config
)
251 assert policy
.need_download()
253 solve
= policy
.solve_with_downloads()
254 self
.config
.handler
.wait_for_blocker(solve
)
257 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
258 assert master_feed
is not None
259 assert master_feed
.implementations
== {}
261 distro_feed_url
= master_feed
.get_distro_feed()
262 assert distro_feed_url
is not None
263 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
264 assert distro_feed
is not None
265 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
267 def testWrongSize(self
):
268 with
output_suppressed():
269 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
270 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
271 policy
= Policy('http://localhost:8000/Hello-wrong-size', config
= self
.config
)
272 assert policy
.need_download()
273 sys
.stdin
= Reply("Y\n")
275 download_and_execute(policy
, ['Hello'], main
= 'Missing')
277 except model
.SafeException
as ex
:
278 if "Downloaded archive has incorrect size" not in str(ex
):
281 def testRecipe(self
):
284 sys
.stdout
= StringIO()
285 run_server(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
286 policy
= Policy(os
.path
.abspath('Recipe.xml'), config
= self
.config
)
288 download_and_execute(policy
, [])
290 except model
.SafeException
as ex
:
291 if "HelloWorld/Missing" not in str(ex
):
296 def testSymlink(self
):
299 sys
.stdout
= StringIO()
300 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
301 policy
= Policy(os
.path
.abspath('RecipeSymlink.xml'), config
= self
.config
)
303 download_and_execute(policy
, [])
305 except model
.SafeException
as ex
:
306 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
308 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
312 def testAutopackage(self
):
315 sys
.stdout
= StringIO()
316 run_server('HelloWorld.autopackage')
317 policy
= Policy(os
.path
.abspath('Autopackage.xml'), config
= self
.config
)
319 download_and_execute(policy
, [])
321 except model
.SafeException
as ex
:
322 if "HelloWorld/Missing" not in str(ex
):
327 def testRecipeFailure(self
):
330 sys
.stdout
= StringIO()
332 policy
= Policy(os
.path
.abspath('Recipe.xml'), config
= self
.config
)
334 download_and_execute(policy
, [])
336 except download
.DownloadError
as ex
:
337 if "Connection" not in str(ex
):
342 def testMirrors(self
):
345 sys
.stdout
= StringIO()
346 getLogger().setLevel(ERROR
)
347 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
348 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
349 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
350 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
352 refreshed
= policy
.solve_with_downloads()
353 policy
.handler
.wait_for_blocker(refreshed
)
358 def testReplay(self
):
361 sys
.stdout
= StringIO()
362 getLogger().setLevel(ERROR
)
363 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
364 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
365 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, file('Hello-new.xml').read(), mtime
+ 10000)
367 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
368 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
369 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
370 self
.config
.feed_mirror
= 'http://example.com:8000/0mirror'
372 # Update from mirror (should ignore out-of-date timestamp)
373 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
374 policy
.handler
.wait_for_blocker(refreshed
)
376 # Update from upstream (should report an error)
377 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
379 policy
.handler
.wait_for_blocker(refreshed
)
380 raise Exception("Should have been rejected!")
381 except model
.SafeException
as ex
:
382 assert "New feed's modification time is before old version" in str(ex
)
384 # Must finish with the newest version
385 self
.assertEquals(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
389 def testBackground(self
, verbose
= False):
390 p
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
391 self
.import_feed(p
.root
, 'Hello.xml')
393 p
.network_use
= model
.network_minimal
394 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
395 assert p
.ready
, p
.solver
.get_failure_reason()
398 def choose_download(registed_cb
, nid
, actions
):
400 assert actions
== ['download', 'Download'], actions
401 registed_cb(nid
, 'download')
404 traceback
.print_exc()
411 sys
.stdout
= StringIO()
412 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
413 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
414 my_dbus
.user_callback
= choose_download
418 # The background handler runs in the same process
419 # as the tests, so don't let it abort.
420 if os
.getpid() == pid
:
421 raise SystemExit(code
)
422 # But, child download processes are OK
424 from zeroinstall
.injector
import config
425 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
426 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
430 background
.spawn_background_update(p
, verbose
)
432 except SystemExit as ex
:
433 self
.assertEquals(1, ex
.code
)
436 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
441 def testBackgroundVerbose(self
):
442 self
.testBackground(verbose
= True)
444 if __name__
== '__main__':
448 kill_server_process()