1 #!/usr/bin/env python2.5
2 from __future__
import with_statement
3 from basetest
import BaseTest
4 import sys
, tempfile
, os
5 from StringIO
import StringIO
6 import unittest
, signal
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
11 os
.environ
['PYTHONPATH'] = os
.path
.abspath('..')
13 os
.environ
["http_proxy"] = "localhost:8000"
15 from zeroinstall
.injector
import model
, autopolicy
, gpg
, iface_cache
, download
, reader
, trust
, handler
, background
, arch
, selections
, qdom
16 from zeroinstall
.zerostore
import Store
; Store
._add
_with
_helper
= lambda *unused
: False
17 from zeroinstall
.support
import basedir
, tasks
18 from zeroinstall
.injector
import fetch
22 fetch
.DEFAULT_KEY_LOOKUP_SERVER
= 'http://localhost:3333/key-info'
30 background
._detach
= lambda: False
31 background
._exec
_gui
= raise_gui
32 sys
.modules
['dbus'] = my_dbus
33 sys
.modules
['dbus.glib'] = my_dbus
34 my_dbus
.types
= my_dbus
35 sys
.modules
['dbus.types'] = my_dbus
38 def output_suppressed():
39 old_stdout
= sys
.stdout
40 old_stderr
= sys
.stderr
42 sys
.stdout
= StringIO()
43 sys
.stderr
= StringIO()
46 sys
.stdout
= old_stdout
47 sys
.stderr
= old_stderr
50 def __init__(self
, reply
):
56 class DummyHandler(handler
.Handler
):
57 __slots__
= ['ex', 'tb']
60 handler
.Handler
.__init
__(self
)
63 def wait_for_blocker(self
, blocker
):
65 handler
.Handler
.wait_for_blocker(self
, blocker
)
67 raise self
.ex
, None, self
.tb
69 def report_error(self
, ex
, tb
= None):
70 assert self
.ex
is None, self
.ex
75 #traceback.print_exc()
77 class TestDownload(BaseTest
):
81 stream
= tempfile
.TemporaryFile()
82 stream
.write(data
.thomas_key
)
84 gpg
.import_key(stream
)
87 trust
.trust_db
.watchers
= []
90 BaseTest
.tearDown(self
)
91 if self
.child
is not None:
92 os
.kill(self
.child
, signal
.SIGTERM
)
93 os
.waitpid(self
.child
, 0)
96 def testRejectKey(self
):
97 with
output_suppressed():
98 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
99 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
100 handler
= DummyHandler())
101 assert policy
.need_download()
102 sys
.stdin
= Reply("N\n")
104 policy
.download_and_execute(['Hello'])
106 except model
.SafeException
, ex
:
107 if "Can't find all required implementations" not in str(ex
):
109 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
112 def testRejectKeyXML(self
):
113 with
output_suppressed():
114 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
115 policy
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml', download_only
= False,
116 handler
= DummyHandler())
117 assert policy
.need_download()
118 sys
.stdin
= Reply("N\n")
120 policy
.download_and_execute(['Hello'])
122 except model
.SafeException
, ex
:
123 if "Can't find all required implementations" not in str(ex
):
125 if "Not signed with a trusted key" not in str(policy
.handler
.ex
):
128 def testImport(self
):
129 from zeroinstall
.injector
import cli
131 rootLogger
= getLogger()
132 rootLogger
.disabled
= True
135 cli
.main(['--import', '-v', 'NO-SUCH-FILE'])
137 except model
.SafeException
, ex
:
138 assert 'NO-SUCH-FILE' in str(ex
)
140 rootLogger
.disabled
= False
141 rootLogger
.setLevel(WARN
)
143 hello
= iface_cache
.iface_cache
.get_interface('http://localhost:8000/Hello')
144 self
.assertEquals(0, len(hello
.implementations
))
146 with
output_suppressed():
147 self
.child
= server
.handle_requests('6FCF121BE2390E0B.gpg')
148 sys
.stdin
= Reply("Y\n")
150 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
151 cli
.main(['--import', 'Hello'])
152 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
154 # Check we imported the interface after trusting the key
155 reader
.update_from_cache(hello
)
156 self
.assertEquals(1, len(hello
.implementations
))
158 # Shouldn't need to prompt the second time
160 cli
.main(['--import', 'Hello'])
162 def testSelections(self
):
163 from zeroinstall
.injector
.cli
import _download_missing_selections
164 root
= qdom
.parse(file("selections.xml"))
165 sels
= selections
.Selections(root
)
166 class Options
: dry_run
= False
168 with
output_suppressed():
169 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
170 sys
.stdin
= Reply("Y\n")
171 _download_missing_selections(Options(), sels
)
172 path
= iface_cache
.iface_cache
.stores
.lookup(sels
.selections
['http://example.com:8000/Hello.xml'].id)
173 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
175 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
177 def testSelectionsWithFeed(self
):
178 from zeroinstall
.injector
.cli
import _download_missing_selections
179 root
= qdom
.parse(file("selections.xml"))
180 sels
= selections
.Selections(root
)
181 class Options
: dry_run
= False
183 with
output_suppressed():
184 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
185 sys
.stdin
= Reply("Y\n")
187 from zeroinstall
.injector
.handler
import Handler
189 fetcher
= fetch
.Fetcher(handler
)
190 handler
.wait_for_blocker(fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', iface_cache
.iface_cache
))
192 _download_missing_selections(Options(), sels
)
193 path
= iface_cache
.iface_cache
.stores
.lookup(sels
.selections
['http://example.com:8000/Hello.xml'].id)
194 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
196 assert sels
.download_missing(iface_cache
.iface_cache
, None) is None
198 def testAcceptKey(self
):
199 with
output_suppressed():
200 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
201 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
202 handler
= DummyHandler())
203 assert policy
.need_download()
204 sys
.stdin
= Reply("Y\n")
206 policy
.download_and_execute(['Hello'], main
= 'Missing')
208 except model
.SafeException
, ex
:
209 if "HelloWorld/Missing" not in str(ex
):
212 def testRecipe(self
):
215 sys
.stdout
= StringIO()
216 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
217 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False)
219 policy
.download_and_execute([])
221 except model
.SafeException
, ex
:
222 if "HelloWorld/Missing" not in str(ex
):
227 def testSymlink(self
):
230 sys
.stdout
= StringIO()
231 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
232 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('RecipeSymlink.xml'), download_only
= False,
233 handler
= DummyHandler())
235 policy
.download_and_execute([])
237 except model
.SafeException
, ex
:
238 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
240 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
244 def testAutopackage(self
):
247 sys
.stdout
= StringIO()
248 self
.child
= server
.handle_requests('HelloWorld.autopackage')
249 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Autopackage.xml'), download_only
= False)
251 policy
.download_and_execute([])
253 except model
.SafeException
, ex
:
254 if "HelloWorld/Missing" not in str(ex
):
259 def testRecipeFailure(self
):
262 sys
.stdout
= StringIO()
263 self
.child
= server
.handle_requests('*')
264 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False,
265 handler
= DummyHandler())
267 policy
.download_and_execute([])
269 except download
.DownloadError
, ex
:
270 if "Connection" not in str(ex
):
275 def testMirrors(self
):
278 sys
.stdout
= StringIO()
279 getLogger().setLevel(ERROR
)
280 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
281 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
282 policy
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml', download_only
= False)
283 policy
.fetcher
.feed_mirror
= 'http://example.com:8000/0mirror'
285 refreshed
= policy
.solve_with_downloads()
286 policy
.handler
.wait_for_blocker(refreshed
)
291 def testReplay(self
):
294 sys
.stdout
= StringIO()
295 getLogger().setLevel(ERROR
)
296 iface
= iface_cache
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
297 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
298 iface_cache
.iface_cache
.update_interface_from_network(iface
, file('Hello-new.xml').read(), mtime
+ 10000)
300 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
301 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
302 policy
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml', download_only
= False)
303 policy
.fetcher
.feed_mirror
= 'http://example.com:8000/0mirror'
305 # Update from mirror (should ignore out-of-date timestamp)
306 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
307 policy
.handler
.wait_for_blocker(refreshed
)
309 # Update from upstream (should report an error)
310 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
312 policy
.handler
.wait_for_blocker(refreshed
)
313 raise Exception("Should have been rejected!")
314 except model
.SafeException
, ex
:
315 assert "New interface's modification time is before old version" in str(ex
)
317 # Must finish with the newest version
318 self
.assertEquals(1235911552, iface_cache
.iface_cache
._get
_signature
_date
(iface
.uri
))
322 def testBackground(self
, verbose
= False):
323 p
= autopolicy
.AutoPolicy('http://example.com:8000/Hello.xml')
324 reader
.update(iface_cache
.iface_cache
.get_interface(p
.root
), 'Hello.xml')
326 p
.network_use
= model
.network_minimal
327 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
331 def choose_download(registed_cb
, nid
, actions
):
333 assert actions
== ['download', 'Download'], actions
334 registed_cb(nid
, 'download')
337 traceback
.print_exc()
344 sys
.stdout
= StringIO()
345 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
346 my_dbus
.user_callback
= choose_download
350 # The background handler runs in the same process
351 # as the tests, so don't let it abort.
352 if os
.getpid() == pid
:
353 raise SystemExit(code
)
354 # But, child download processes are OK
359 background
.spawn_background_update(p
, verbose
)
361 except SystemExit, ex
:
362 self
.assertEquals(1, ex
.code
)
369 def testBackgroundVerbose(self
):
370 self
.testBackground(verbose
= True)
372 suite
= unittest
.makeSuite(TestDownload
)
373 if __name__
== '__main__':
374 sys
.argv
.append('-v')