2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
.injector
import model
, gpg
, iface_cache
, download
, reader
, trust
, handler
, background
, arch
, selections
, qdom
, run
15 from zeroinstall
.injector
.policy
import Policy
16 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
17 from zeroinstall
.support
import basedir
, tasks
18 from zeroinstall
.injector
import fetch
22 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= 'http://localhost:3333/key-info'
30 background
._detach
= lambda: False
31 background
._exec
_gui
= raise_gui
34 def output_suppressed():
35 old_stdout
= sys
.stdout
36 old_stderr
= sys
.stderr
38 sys
.stdout
= StringIO()
39 sys
.stderr
= StringIO()
44 except BaseException
, ex
:
45 # Don't abort unit-tests if someone raises SystemExit
46 raise Exception(str(type(ex
)) + " " + str(ex
))
48 sys
.stdout
= old_stdout
49 sys
.stderr
= old_stderr
52 def __init__(self
, reply
):
58 def download_and_execute(policy
, prog_args
, main
= None):
59 downloaded
= policy
.solve_and_download_impls()
61 policy
.config
.handler
.wait_for_blocker(downloaded
)
62 run
.execute_selections(policy
.solver
.selections
, prog_args
, stores
= policy
.config
.stores
, main
= main
)
66 return 3 # NM_STATUS_CONNECTED
68 class TestDownload(BaseTest
):
72 self
.config
.fetcher
= fetch
.Fetcher(self
.config
.handler
)
74 stream
= tempfile
.TemporaryFile()
75 stream
.write(data
.thomas_key
)
77 gpg
.import_key(stream
)
80 trust
.trust_db
.watchers
= []
83 BaseTest
.tearDown(self
)
84 if self
.child
is not None:
85 os
.kill(self
.child
, signal
.SIGTERM
)
86 os
.waitpid(self
.child
, 0)
89 def testRejectKey(self
):
90 with
output_suppressed():
91 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
92 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
93 assert policy
.need_download()
94 sys
.stdin
= Reply("N\n")
96 download_and_execute(policy
, ['Hello'])
98 except model
.SafeException
, ex
:
99 if "has no usable implementations" not in str(ex
):
101 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
103 self
.config
.handler
.ex
= None
105 def testRejectKeyXML(self
):
106 with
output_suppressed():
107 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
108 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
109 assert policy
.need_download()
110 sys
.stdin
= Reply("N\n")
112 download_and_execute(policy
, ['Hello'])
114 except model
.SafeException
, ex
:
115 if "has no usable implementations" not in str(ex
):
117 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
119 self
.config
.handler
.ex
= None
121 def testImport(self
):
122 from zeroinstall
.injector
import cli
124 rootLogger
= getLogger()
125 rootLogger
.disabled
= True
128 cli
.main(['--import', '-v', 'NO-SUCH-FILE'])
130 except model
.SafeException
, ex
:
131 assert 'NO-SUCH-FILE' in str(ex
)
133 rootLogger
.disabled
= False
134 rootLogger
.setLevel(WARN
)
136 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
137 self
.assertEquals(None, hello
)
139 with
output_suppressed():
140 self
.child
= server
.handle_requests('6FCF121BE2390E0B.gpg')
141 sys
.stdin
= Reply("Y\n")
143 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
144 cli
.main(['--import', 'Hello'])
145 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
147 # Check we imported the interface after trusting the key
148 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
149 self
.assertEquals(1, len(hello
.implementations
))
151 # Shouldn't need to prompt the second time
153 cli
.main(['--import', 'Hello'])
155 def testSelections(self
):
156 from zeroinstall
.injector
import cli
157 root
= qdom
.parse(file("selections.xml"))
158 sels
= selections
.Selections(root
)
159 class Options
: dry_run
= False
161 with
output_suppressed():
162 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
163 sys
.stdin
= Reply("Y\n")
165 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
169 cli
.main(['--download-only', 'selections.xml'])
170 path
= self
.config
.iface_cache
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
171 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
173 assert sels
.download_missing(self
.config
.iface_cache
, None) is None
175 def testHelpers(self
):
176 from zeroinstall
import helpers
178 with
output_suppressed():
179 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
180 sys
.stdin
= Reply("Y\n")
181 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml')
182 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
183 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
184 assert sels
.download_missing(self
.config
.iface_cache
, None) is None
186 def testSelectionsWithFeed(self
):
187 from zeroinstall
.injector
import cli
188 root
= qdom
.parse(file("selections.xml"))
189 sels
= selections
.Selections(root
)
191 with
output_suppressed():
192 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
193 sys
.stdin
= Reply("Y\n")
195 self
.config
.handler
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
197 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
198 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
199 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
201 assert sels
.download_missing(self
.config
.iface_cache
, None) is None
203 def testAcceptKey(self
):
204 with
output_suppressed():
205 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
206 policy
= Policy('http://localhost:8000/Hello', config
= self
.config
)
207 assert policy
.need_download()
208 sys
.stdin
= Reply("Y\n")
210 download_and_execute(policy
, ['Hello'], main
= 'Missing')
212 except model
.SafeException
, ex
:
213 if "HelloWorld/Missing" not in str(ex
):
216 def testDistro(self
):
217 with
output_suppressed():
218 native_url
= 'http://example.com:8000/Native.xml'
220 # Initially, we don't have the feed at all...
221 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
222 assert master_feed
is None, master_feed
224 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
225 self
.child
= server
.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
226 policy
= Policy(native_url
, config
= self
.config
)
227 assert policy
.need_download()
229 solve
= policy
.solve_with_downloads()
230 self
.config
.handler
.wait_for_blocker(solve
)
233 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
234 assert master_feed
is not None
235 assert master_feed
.implementations
== {}
237 distro_feed_url
= master_feed
.get_distro_feed()
238 assert distro_feed_url
is not None
239 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
240 assert distro_feed
is not None
241 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
243 def testWrongSize(self
):
244 with
output_suppressed():
245 self
.child
= server
.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
246 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
247 policy
= Policy('http://localhost:8000/Hello-wrong-size', config
= self
.config
)
248 assert policy
.need_download()
249 sys
.stdin
= Reply("Y\n")
251 download_and_execute(policy
, ['Hello'], main
= 'Missing')
253 except model
.SafeException
, ex
:
254 if "Downloaded archive has incorrect size" not in str(ex
):
257 def testRecipe(self
):
260 sys
.stdout
= StringIO()
261 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
262 policy
= Policy(os
.path
.abspath('Recipe.xml'), config
= self
.config
)
264 download_and_execute(policy
, [])
266 except model
.SafeException
, ex
:
267 if "HelloWorld/Missing" not in str(ex
):
272 def testSymlink(self
):
275 sys
.stdout
= StringIO()
276 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
277 policy
= Policy(os
.path
.abspath('RecipeSymlink.xml'), config
= self
.config
)
279 download_and_execute(policy
, [])
281 except model
.SafeException
, ex
:
282 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
284 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
288 def testAutopackage(self
):
291 sys
.stdout
= StringIO()
292 self
.child
= server
.handle_requests('HelloWorld.autopackage')
293 policy
= Policy(os
.path
.abspath('Autopackage.xml'), config
= self
.config
)
295 download_and_execute(policy
, [])
297 except model
.SafeException
, ex
:
298 if "HelloWorld/Missing" not in str(ex
):
303 def testRecipeFailure(self
):
306 sys
.stdout
= StringIO()
307 self
.child
= server
.handle_requests('*')
308 policy
= Policy(os
.path
.abspath('Recipe.xml'), config
= self
.config
)
310 download_and_execute(policy
, [])
312 except download
.DownloadError
, ex
:
313 if "Connection" not in str(ex
):
318 def testMirrors(self
):
321 sys
.stdout
= StringIO()
322 getLogger().setLevel(ERROR
)
323 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
324 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
325 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
326 policy
.fetcher
.feed_mirror
= 'http://example.com:8000/0mirror'
328 refreshed
= policy
.solve_with_downloads()
329 policy
.handler
.wait_for_blocker(refreshed
)
334 def testReplay(self
):
337 sys
.stdout
= StringIO()
338 getLogger().setLevel(ERROR
)
339 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
340 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
341 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, file('Hello-new.xml').read(), mtime
+ 10000)
343 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
344 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
345 policy
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
346 policy
.fetcher
.feed_mirror
= 'http://example.com:8000/0mirror'
348 # Update from mirror (should ignore out-of-date timestamp)
349 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
350 policy
.handler
.wait_for_blocker(refreshed
)
352 # Update from upstream (should report an error)
353 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
355 policy
.handler
.wait_for_blocker(refreshed
)
356 raise Exception("Should have been rejected!")
357 except model
.SafeException
, ex
:
358 assert "New feed's modification time is before old version" in str(ex
)
360 # Must finish with the newest version
361 self
.assertEquals(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
365 def testBackground(self
, verbose
= False):
366 p
= Policy('http://example.com:8000/Hello.xml', config
= self
.config
)
367 reader
.update(p
.config
.iface_cache
.get_interface(p
.root
), 'Hello.xml')
369 p
.network_use
= model
.network_minimal
370 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
374 def choose_download(registed_cb
, nid
, actions
):
376 assert actions
== ['download', 'Download'], actions
377 registed_cb(nid
, 'download')
380 traceback
.print_exc()
387 sys
.stdout
= StringIO()
388 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
389 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
390 my_dbus
.user_callback
= choose_download
394 # The background handler runs in the same process
395 # as the tests, so don't let it abort.
396 if os
.getpid() == pid
:
397 raise SystemExit(code
)
398 # But, child download processes are OK
400 key_info
= fetch
.DEFAULT_KEY_LOOKUP_SERVER
401 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= None
405 background
.spawn_background_update(p
, verbose
)
407 except SystemExit, ex
:
408 self
.assertEquals(1, ex
.code
)
411 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
416 def testBackgroundVerbose(self
):
417 self
.testBackground(verbose
= True)
419 if __name__
== '__main__':