2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
.injector
import model
, autopolicy
, gpg
, iface_cache
, download
, reader
, trust
, handler
, background
, arch
, selections
, qdom
15 from zeroinstall
.zerostore
import Store
; Store
._add
_with
_helper
= lambda *unused
: False
16 from zeroinstall
.support
import basedir
, tasks
17 from zeroinstall
.injector
import fetch
21 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= 'http://localhost:3333/key-info'
29 background
._detach
= lambda: False
30 background
._exec
_gui
= raise_gui
33 def output_suppressed():
34 old_stdout
= sys
.stdout
35 old_stderr
= sys
.stderr
37 sys
.stdout
= StringIO()
38 sys
.stderr
= StringIO()
43 except BaseException
, ex
:
44 # Don't abort unit-tests if someone raises SystemExit
45 raise Exception(str(type(ex
)) + " " + str(ex
))
47 sys
.stdout
= old_stdout
48 sys
.stderr
= old_stderr
51 def __init__(self
, reply
):
57 class DummyHandler(handler
.Handler
):
58 __slots__
= ['ex', 'tb']
61 handler
.Handler
.__init
__(self
)
64 def wait_for_blocker(self
, blocker
):
66 handler
.Handler
.wait_for_blocker(self
, blocker
)
68 raise self
.ex
, None, self
.tb
70 def report_error(self
, ex
, tb
= None):
71 assert self
.ex
is None, self
.ex
76 #traceback.print_exc()
80 return 3 # NM_STATUS_CONNECTED
82 class TestDownload(BaseTest
):
86 stream
= tempfile
.TemporaryFile()
87 stream
.write(data
.thomas_key
)
89 gpg
.import_key(stream
)
92 trust
.trust_db
.watchers
= []
95 BaseTest
.tearDown(self
)
96 if self
.child
is not None:
97 os
.kill(self
.child
, signal
.SIGTERM
)
98 os
.waitpid(self
.child
, 0)
101 def testRejectKey(self
):
102 with
output_suppressed():
103 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
104 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
105 handler
= DummyHandler())
106 assert policy
.need_download()
107 sys
.stdin
= Reply("N\n")
109 policy
.download_and_execute(['Hello'])
111 except model
.SafeException
, ex
:
112 if "has no usable implementations" not in str(ex
):
114 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
117 def testRejectKeyXML(self
):
118 with
output_suppressed():
119 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
120 policy
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml', download_only
= False,
121 handler
= DummyHandler())
122 assert policy
.need_download()
123 sys
.stdin
= Reply("N\n")
125 policy
.download_and_execute(['Hello'])
127 except model
.SafeException
, ex
:
128 if "has no usable implementations" not in str(ex
):
130 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
133 def testImport(self
):
134 from zeroinstall
.injector
import cli
136 rootLogger
= getLogger()
137 rootLogger
.disabled
= True
140 cli
.main(['--import', '-v', 'NO-SUCH-FILE'])
142 except model
.SafeException
, ex
:
143 assert 'NO-SUCH-FILE' in str(ex
)
145 rootLogger
.disabled
= False
146 rootLogger
.setLevel(WARN
)
148 hello
= iface_cache
.iface_cache
.get_feed('http://localhost:8000/Hello')
149 self
.assertEquals(None, hello
)
151 with
output_suppressed():
152 self
.child
= server
.handle_requests('6FCF121BE2390E0B.gpg')
153 sys
.stdin
= Reply("Y\n")
155 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 cli
.main(['--import', 'Hello'])
157 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
159 # Check we imported the interface after trusting the key
160 hello
= iface_cache
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
161 self
.assertEquals(1, len(hello
.implementations
))
163 # Shouldn't need to prompt the second time
165 cli
.main(['--import', 'Hello'])
167 def testSelections(self
):
168 from zeroinstall
.injector
.cli
import _download_missing_selections
169 root
= qdom
.parse(file("selections.xml"))
170 sels
= selections
.Selections(root
)
171 class Options
: dry_run
= False
173 with
output_suppressed():
174 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
175 sys
.stdin
= Reply("Y\n")
176 _download_missing_selections(Options(), sels
)
177 path
= iface_cache
.iface_cache
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
178 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
180 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
182 def testHelpers(self
):
183 from zeroinstall
import helpers
185 with
output_suppressed():
186 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
187 sys
.stdin
= Reply("Y\n")
188 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml')
189 path
= iface_cache
.iface_cache
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
190 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
191 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
193 def testSelectionsWithFeed(self
):
194 from zeroinstall
.injector
.cli
import _download_missing_selections
195 root
= qdom
.parse(file("selections.xml"))
196 sels
= selections
.Selections(root
)
197 class Options
: dry_run
= False
199 with
output_suppressed():
200 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
201 sys
.stdin
= Reply("Y\n")
203 from zeroinstall
.injector
.handler
import Handler
205 fetcher
= fetch
.Fetcher(handler
)
206 handler
.wait_for_blocker(fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache
.iface_cache
))
208 _download_missing_selections(Options(), sels
)
209 path
= iface_cache
.iface_cache
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
210 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
212 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
214 def testAcceptKey(self
):
215 with
output_suppressed():
216 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
217 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
218 handler
= DummyHandler())
219 assert policy
.need_download()
220 sys
.stdin
= Reply("Y\n")
222 policy
.download_and_execute(['Hello'], main
= 'Missing')
224 except model
.SafeException
, ex
:
225 if "HelloWorld/Missing" not in str(ex
):
228 def testDistro(self
):
229 with
output_suppressed():
230 native_url
= 'http://example.com:8000/Native.xml'
232 # Initially, we don't have the feed at all...
233 master_feed
= iface_cache
.iface_cache
.get_feed(native_url
)
234 assert master_feed
is None, master_feed
236 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
237 self
.child
= server
.handle_requests('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
239 policy
= autopolicy
.AutoPolicy(native_url
, download_only
= False, handler
= h
)
240 assert policy
.need_download()
242 solve
= policy
.solve_with_downloads()
243 h
.wait_for_blocker(solve
)
246 master_feed
= iface_cache
.iface_cache
.get_feed(native_url
)
247 assert master_feed
is not None
248 assert master_feed
.implementations
== {}
250 distro_feed_url
= master_feed
.get_distro_feed()
251 assert distro_feed_url
is not None
252 distro_feed
= iface_cache
.iface_cache
.get_feed(distro_feed_url
)
253 assert distro_feed
is not None
254 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
256 def testWrongSize(self
):
257 with
output_suppressed():
258 self
.child
= server
.handle_requests('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
259 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
260 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello-wrong-size', download_only
= False,
261 handler
= DummyHandler())
262 assert policy
.need_download()
263 sys
.stdin
= Reply("Y\n")
265 policy
.download_and_execute(['Hello'], main
= 'Missing')
267 except model
.SafeException
, ex
:
268 if "Downloaded archive has incorrect size" not in str(ex
):
271 def testRecipe(self
):
274 sys
.stdout
= StringIO()
275 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
276 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False)
278 policy
.download_and_execute([])
280 except model
.SafeException
, ex
:
281 if "HelloWorld/Missing" not in str(ex
):
286 def testSymlink(self
):
289 sys
.stdout
= StringIO()
290 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
291 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('RecipeSymlink.xml'), download_only
= False,
292 handler
= DummyHandler())
294 policy
.download_and_execute([])
296 except model
.SafeException
, ex
:
297 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
299 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
303 def testAutopackage(self
):
306 sys
.stdout
= StringIO()
307 self
.child
= server
.handle_requests('HelloWorld.autopackage')
308 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Autopackage.xml'), download_only
= False)
310 policy
.download_and_execute([])
312 except model
.SafeException
, ex
:
313 if "HelloWorld/Missing" not in str(ex
):
318 def testRecipeFailure(self
):
321 sys
.stdout
= StringIO()
322 self
.child
= server
.handle_requests('*')
323 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False,
324 handler
= DummyHandler())
326 policy
.download_and_execute([])
328 except download
.DownloadError
, ex
:
329 if "Connection" not in str(ex
):
334 def testMirrors(self
):
337 sys
.stdout
= StringIO()
338 getLogger().setLevel(ERROR
)
339 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
340 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
341 policy
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml', download_only
= False)
342 policy
.fetcher
.feed_mirror
= 'http://example.com:8000/0mirror'
344 refreshed
= policy
.solve_with_downloads()
345 policy
.handler
.wait_for_blocker(refreshed
)
350 def testReplay(self
):
353 sys
.stdout
= StringIO()
354 getLogger().setLevel(ERROR
)
355 iface
= iface_cache
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
356 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
357 iface_cache
.iface_cache
.update_feed_from_network(iface
.uri
, file('Hello-new.xml').read(), mtime
+ 10000)
359 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
360 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
361 policy
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml', download_only
= False)
362 policy
.fetcher
.feed_mirror
= 'http://example.com:8000/0mirror'
364 # Update from mirror (should ignore out-of-date timestamp)
365 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
366 policy
.handler
.wait_for_blocker(refreshed
)
368 # Update from upstream (should report an error)
369 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
371 policy
.handler
.wait_for_blocker(refreshed
)
372 raise Exception("Should have been rejected!")
373 except model
.SafeException
, ex
:
374 assert "New feed's modification time is before old version" in str(ex
)
376 # Must finish with the newest version
377 self
.assertEquals(1235911552, iface_cache
.iface_cache
._get
_signature
_date
(iface
.uri
))
381 def testBackground(self
, verbose
= False):
382 p
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml')
383 reader
.update(iface_cache
.iface_cache
.get_interface(p
.root
), 'Hello.xml')
385 p
.network_use
= model
.network_minimal
386 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
390 def choose_download(registed_cb
, nid
, actions
):
392 assert actions
== ['download', 'Download'], actions
393 registed_cb(nid
, 'download')
396 traceback
.print_exc()
403 sys
.stdout
= StringIO()
404 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
405 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
406 my_dbus
.user_callback
= choose_download
410 # The background handler runs in the same process
411 # as the tests, so don't let it abort.
412 if os
.getpid() == pid
:
413 raise SystemExit(code
)
414 # But, child download processes are OK
416 key_info
= fetch
.DEFAULT_KEY_LOOKUP_SERVER
417 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= None
421 background
.spawn_background_update(p
, verbose
)
423 except SystemExit, ex
:
424 self
.assertEquals(1, ex
.code
)
427 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
432 def testBackgroundVerbose(self
):
433 self
.testBackground(verbose
= True)
435 if __name__
== '__main__':