1 #!/usr/bin/env python2.4
2 from basetest
import BaseTest
3 import sys
, tempfile
, os
, shutil
4 from StringIO
import StringIO
5 import unittest
, signal
6 from logging
import getLogger
, DEBUG
, INFO
, WARN
, ERROR
8 sys
.path
.insert(0, '..')
10 from zeroinstall
.injector
import model
, autopolicy
, gpg
, iface_cache
, download
, reader
, trust
, handler
11 from zeroinstall
.zerostore
import Store
; Store
._add
_with
_helper
= lambda *unused
: False
12 from zeroinstall
.support
import basedir
18 def __init__(self
, reply
):
24 class DummyHandler(handler
.Handler
):
25 __slots__
= ['ex', 'tb']
28 handler
.Handler
.__init
__(self
)
31 def wait_for_blocker(self
, blocker
):
33 handler
.Handler
.wait_for_blocker(self
, blocker
)
35 raise self
.ex
, None, self
.tb
37 def report_error(self
, ex
, tb
= None):
38 assert self
.ex
is None, self
.ex
43 #traceback.print_exc()
45 class TestDownload(BaseTest
):
49 stream
= tempfile
.TemporaryFile()
50 stream
.write(data
.thomas_key
)
52 gpg
.import_key(stream
)
55 trust
.trust_db
.watchers
= []
58 BaseTest
.tearDown(self
)
59 if self
.child
is not None:
60 os
.kill(self
.child
, signal
.SIGTERM
)
61 os
.waitpid(self
.child
, 0)
64 def testRejectKey(self
):
67 sys
.stdout
= StringIO()
68 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
69 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
70 handler
= DummyHandler())
71 assert policy
.need_download()
72 sys
.stdin
= Reply("N\n")
74 policy
.download_and_execute(['Hello'])
76 except model
.SafeException
, ex
:
77 if "Not signed with a trusted key" not in str(ex
):
82 def testRejectKeyXML(self
):
85 sys
.stdout
= StringIO()
86 self
.child
= server
.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
87 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False,
88 handler
= DummyHandler())
89 assert policy
.need_download()
90 sys
.stdin
= Reply("N\n")
92 policy
.download_and_execute(['Hello'])
94 except model
.SafeException
, ex
:
95 if "Not signed with a trusted key" not in str(ex
):
100 def testImport(self
):
103 from zeroinstall
.injector
import cli
106 rootLogger
= getLogger()
107 rootLogger
.disabled
= True
110 cli
.main(['--import', '-v', 'NO-SUCH-FILE'])
112 except model
.SafeException
, ex
:
113 assert 'NO-SUCH-FILE' in str(ex
)
115 rootLogger
.disabled
= False
116 rootLogger
.setLevel(WARN
)
118 hello
= iface_cache
.iface_cache
.get_interface('http://localhost:8000/Hello')
119 self
.assertEquals(0, len(hello
.implementations
))
121 sys
.stdout
= StringIO()
122 self
.child
= server
.handle_requests('6FCF121BE2390E0B.gpg')
123 sys
.stdin
= Reply("Y\n")
125 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
126 cli
.main(['--import', 'Hello'])
127 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
129 # Check we imported the interface after trusting the key
130 reader
.update_from_cache(hello
)
131 self
.assertEquals(1, len(hello
.implementations
))
133 # Shouldn't need to prompt the second time
135 cli
.main(['--import', 'Hello'])
139 def testAcceptKey(self
):
142 sys
.stdout
= StringIO()
143 self
.child
= server
.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
144 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello', download_only
= False,
145 handler
= DummyHandler())
146 assert policy
.need_download()
147 sys
.stdin
= Reply("Y\n")
149 policy
.download_and_execute(['Hello'], main
= 'Missing')
151 except model
.SafeException
, ex
:
152 if "HelloWorld/Missing" not in str(ex
):
157 def testRecipe(self
):
160 sys
.stdout
= StringIO()
161 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
162 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False)
164 policy
.download_and_execute([])
166 except model
.SafeException
, ex
:
167 if "HelloWorld/Missing" not in str(ex
):
172 def testSymlink(self
):
175 sys
.stdout
= StringIO()
176 self
.child
= server
.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
177 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('RecipeSymlink.xml'), download_only
= False,
178 handler
= DummyHandler())
180 policy
.download_and_execute([])
182 except model
.SafeException
, ex
:
183 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
185 self
.assertEquals(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
189 def testAutopackage(self
):
192 sys
.stdout
= StringIO()
193 self
.child
= server
.handle_requests('HelloWorld.autopackage')
194 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Autopackage.xml'), download_only
= False)
196 policy
.download_and_execute([])
198 except model
.SafeException
, ex
:
199 if "HelloWorld/Missing" not in str(ex
):
204 def testRecipeFailure(self
):
207 sys
.stdout
= StringIO()
208 self
.child
= server
.handle_requests('*')
209 policy
= autopolicy
.AutoPolicy(os
.path
.abspath('Recipe.xml'), download_only
= False,
210 handler
= DummyHandler())
212 policy
.download_and_execute([])
214 except download
.DownloadError
, ex
:
215 if "Connection" not in str(ex
):
220 def testMirrors(self
):
223 sys
.stdout
= StringIO()
224 getLogger().setLevel(ERROR
)
225 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'localhost:8000')
226 self
.child
= server
.handle_requests(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg')
227 policy
= autopolicy
.AutoPolicy('http://localhost:8000/Hello.xml', download_only
= False)
228 policy
.fetcher
.feed_mirror
= 'http://localhost:8000/0mirror'
230 refreshed
= policy
.solve_with_downloads([])
231 errors
= policy
.handler
.wait_for_blocker(refreshed
)
233 raise model
.SafeException("Errors during download: " + '\n'.join(errors
))
239 suite
= unittest
.makeSuite(TestDownload
)
240 if __name__
== '__main__':
241 sys
.argv
.append('-v')