1 #!/usr/bin/env python2.4
2 from basetest
import BaseTest
3 import sys
, tempfile
, os
, shutil
4 from StringIO
import StringIO
5 import unittest
, signal
6 from logging
import getLogger
, DEBUG
, INFO
, WARN
, ERROR
8 sys
.path
.insert(0, '..')
10 from zeroinstall
.injector
import model
, autopolicy
, gpg
, iface_cache
, download
, reader
, trust
, handler
, background
, arch
11 from zeroinstall
.zerostore
import Store
; Store
._add
_with
_helper
= lambda *unused
: False
12 from zeroinstall
.support
import basedir
, tasks
22 background
._detach
= lambda: False
23 background
._exec
_gui
= raise_gui
24 sys
.modules
['dbus'] = my_dbus
25 sys
.modules
['dbus.glib'] = my_dbus
26 my_dbus
.types
= my_dbus
27 sys
.modules
['dbus.types'] = my_dbus
30 def __init__(self
, reply
):
36 class DummyHandler(handler
.Handler
):
37 __slots__
= ['ex', 'tb']
40 handler
.Handler
.__init
__(self
)
43 def wait_for_blocker(self
, blocker
):
45 handler
.Handler
.wait_for_blocker(self
, blocker
)
47 raise self
.ex
, None, self
.tb
49 def report_error(self
, ex
, tb
= None):
50 assert self
.ex
is None, self
.ex
55 #traceback.print_exc()
57 class TestDownload(BaseTest
):
61 stream
= tempfile
.TemporaryFile()
62 stream
.write(data
.thomas_key
)
64 gpg
.import_key(stream
)
67 trust
.trust_db
.watchers
= []
70 BaseTest
.tearDown(self
)
71 if self
.child
is not None:
72 os
.kill(self
.child
, signal
.SIGTERM
)
73 os
.waitpid(self
.child
, 0)
76 def testRejectKey(self
):
79 sys
.stdout
= StringIO()
80 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
81 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
82 handler
= DummyHandler())
83 assert policy
.need_download()
84 sys
.stdin
= Reply("N\n")
86 policy
.download_and_execute(['Hello'])
88 except model
.SafeException
, ex
:
89 if "Not signed with a trusted key" not in str(ex
):
94 def testRejectKeyXML(self
):
97 sys
.stdout
= StringIO()
98 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
99 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False,
100 handler
= DummyHandler())
101 assert policy
.need_download()
102 sys
.stdin
= Reply("N\n")
104 policy
.download_and_execute(['Hello'])
106 except model
.SafeException
, ex
:
107 if "Not signed with a trusted key" not in str(ex
):
112 def testImport(self
):
115 from zeroinstall
.injector
import cli
118 rootLogger
= getLogger()
119 rootLogger
.disabled
= True
122 cli
.main(['--import', '-v', 'NO-SUCH-FILE'])
124 except model
.SafeException
, ex
:
125 assert 'NO-SUCH-FILE' in str(ex
)
127 rootLogger
.disabled
= False
128 rootLogger
.setLevel(WARN
)
130 hello
= iface_cache
.iface_cache
.get_interface('http://localhost:8000/Hello')
131 self
.assertEquals(0, len(hello
.implementations
))
133 sys
.stdout
= StringIO()
134 self
.child
= server
.handle_requests('6FCF121BE2390E0B.gpg')
135 sys
.stdin
= Reply("Y\n")
137 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
138 cli
.main(['--import', 'Hello'])
139 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
141 # Check we imported the interface after trusting the key
142 reader
.update_from_cache(hello
)
143 self
.assertEquals(1, len(hello
.implementations
))
145 # Shouldn't need to prompt the second time
147 cli
.main(['--import', 'Hello'])
151 def testAcceptKey(self
):
154 sys
.stdout
= StringIO()
155 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
156 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
157 handler
= DummyHandler())
158 assert policy
.need_download()
159 sys
.stdin
= Reply("Y\n")
161 policy
.download_and_execute(['Hello'], main
= 'Missing')
163 except model
.SafeException
, ex
:
164 if "HelloWorld/Missing" not in str(ex
):
169 def testRecipe(self
):
172 sys
.stdout
= StringIO()
173 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
174 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False)
176 policy
.download_and_execute([])
178 except model
.SafeException
, ex
:
179 if "HelloWorld/Missing" not in str(ex
):
184 def testSymlink(self
):
187 sys
.stdout
= StringIO()
188 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
189 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('RecipeSymlink.xml'), download_only
= False,
190 handler
= DummyHandler())
192 policy
.download_and_execute([])
194 except model
.SafeException
, ex
:
195 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
197 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
201 def testAutopackage(self
):
204 sys
.stdout
= StringIO()
205 self
.child
= server
.handle_requests('HelloWorld.autopackage')
206 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Autopackage.xml'), download_only
= False)
208 policy
.download_and_execute([])
210 except model
.SafeException
, ex
:
211 if "HelloWorld/Missing" not in str(ex
):
216 def testRecipeFailure(self
):
219 sys
.stdout
= StringIO()
220 self
.child
= server
.handle_requests('*')
221 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False,
222 handler
= DummyHandler())
224 policy
.download_and_execute([])
226 except download
.DownloadError
, ex
:
227 if "Connection" not in str(ex
):
232 def testMirrors(self
):
235 sys
.stdout
= StringIO()
236 getLogger().setLevel(ERROR
)
237 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
238 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
239 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False)
240 policy
.fetcher
.feed_mirror
= 'http://localhost:8000/0mirror'
242 refreshed
= policy
.solve_with_downloads()
243 policy
.handler
.wait_for_blocker(refreshed
)
248 def testReplay(self
):
251 sys
.stdout
= StringIO()
252 getLogger().setLevel(ERROR
)
253 iface
= iface_cache
.iface_cache
.get_interface('http://localhost:8000/Hello.xml')
254 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
255 iface_cache
.iface_cache
.update_interface_from_network(iface
, file('Hello-new.xml').read(), mtime
+ 10000)
257 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
258 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
259 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False)
260 policy
.fetcher
.feed_mirror
= 'http://localhost:8000/0mirror'
262 # Update from mirror (should ignore out-of-date timestamp)
263 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
264 policy
.handler
.wait_for_blocker(refreshed
)
266 # Update from upstream (should report an error)
267 refreshed
= policy
.fetcher
.download_and_import_feed(iface
.uri
, iface_cache
.iface_cache
)
269 policy
.handler
.wait_for_blocker(refreshed
)
270 raise Exception("Should have been rejected!")
271 except model
.SafeException
, ex
:
272 assert "New interface's modification time is before old version" in str(ex
)
274 # Must finish with the newest version
275 self
.assertEquals(1209206132, iface_cache
.iface_cache
._get
_signature
_date
(iface
.uri
))
279 def testBackground(self
, verbose
= False):
280 p
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml')
281 reader
.update(iface_cache
.iface_cache
.get_interface(p
.root
), 'Hello.xml')
283 p
.network_use
= model
.network_minimal
284 p
.solver
.solve(p
.root
, arch
.get_host_architecture())
288 def choose_download(registed_cb
, nid
, actions
):
290 assert actions
== ['download', 'Download'], actions
291 registed_cb(nid
, 'download')
294 traceback
.print_exc()
301 sys
.stdout
= StringIO()
302 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
303 my_dbus
.user_callback
= choose_download
307 # The background handler runs in the same process
308 # as the tests, so don't let it abort.
309 if os
.getpid() == pid
:
310 raise SystemExit(code
)
311 # But, child download processes are OK
316 background
.spawn_background_update(p
, verbose
)
318 except SystemExit, ex
:
319 self
.assertEquals(1, ex
.code
)
326 def testBackgroundVerbose(self
):
327 self
.testBackground(verbose
= True)
329 suite
= unittest
.makeSuite(TestDownload
)
330 if __name__
== '__main__':
331 sys
.argv
.append('-v')