Massive refactoring for tasks system.
[zeroinstall/zeroinstall-rsl.git] / tests / testdownload.py
blobbec15a51866be8170d1bdebf725b4ca68d36c9f6
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest, signal
6 from logging import getLogger, DEBUG, INFO, WARN
8 sys.path.insert(0, '..')
10 from zeroinstall.injector import model, basedir, autopolicy, gpg, iface_cache, download, reader, trust, handler
11 from zeroinstall.zerostore import Store; Store._add_with_helper = lambda *unused: False
12 import data
14 import server
16 class Reply:
17 def __init__(self, reply):
18 self.reply = reply
20 def readline(self):
21 return self.reply
23 class DummyHandler(handler.Handler):
24 __slots__ = ['ex']
26 def __init__(self):
27 handler.Handler.__init__(self)
28 self.ex = None
30 def wait_for_blocker(self, blocker):
31 self.ex = None
32 handler.Handler.wait_for_blocker(self, blocker)
33 if self.ex:
34 raise self.ex
36 def report_error(self, ex):
37 assert self.ex is None, self.ex
38 self.ex = ex
40 #import traceback
41 #traceback.print_exc()
43 class TestDownload(BaseTest):
44 def setUp(self):
45 BaseTest.setUp(self)
47 stream = tempfile.TemporaryFile()
48 stream.write(data.thomas_key)
49 stream.seek(0)
50 gpg.import_key(stream)
51 self.child = None
53 trust.trust_db.watchers = []
55 def tearDown(self):
56 BaseTest.tearDown(self)
57 if self.child is not None:
58 os.kill(self.child, signal.SIGTERM)
59 os.waitpid(self.child, 0)
60 self.child = None
62 def testRejectKey(self):
63 old_out = sys.stdout
64 try:
65 sys.stdout = StringIO()
66 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg')
67 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
68 handler = DummyHandler())
69 assert policy.need_download()
70 sys.stdin = Reply("N\n")
71 try:
72 policy.download_and_execute(['Hello'])
73 assert 0
74 except model.SafeException, ex:
75 if "Not signed with a trusted key" not in str(ex):
76 raise ex
77 finally:
78 sys.stdout = old_out
80 def testRejectKeyXML(self):
81 old_out = sys.stdout
82 try:
83 sys.stdout = StringIO()
84 self.child = server.handle_requests('Hello.xml', '6FCF121BE2390E0B.gpg')
85 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello.xml', download_only = False,
86 handler = DummyHandler())
87 assert policy.need_download()
88 sys.stdin = Reply("N\n")
89 try:
90 policy.download_and_execute(['Hello'])
91 assert 0
92 except model.SafeException, ex:
93 if "Not signed with a trusted key" not in str(ex):
94 raise
95 finally:
96 sys.stdout = old_out
98 def testImport(self):
99 old_out = sys.stdout
100 try:
101 from zeroinstall.injector import cli
102 import logging
104 rootLogger = getLogger()
105 rootLogger.disabled = True
106 try:
107 try:
108 cli.main(['--import', '-v', 'NO-SUCH-FILE'])
109 assert 0
110 except model.SafeException, ex:
111 assert 'NO-SUCH-FILE' in str(ex)
112 finally:
113 rootLogger.disabled = False
114 rootLogger.setLevel(WARN)
116 hello = iface_cache.iface_cache.get_interface('http://localhost:8000/Hello')
117 self.assertEquals(0, len(hello.implementations))
119 sys.stdout = StringIO()
120 self.child = server.handle_requests('6FCF121BE2390E0B.gpg')
121 sys.stdin = Reply("Y\n")
123 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
124 cli.main(['--import', 'Hello'])
125 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
127 # Check we imported the interface after trusting the key
128 reader.update_from_cache(hello)
129 self.assertEquals(1, len(hello.implementations))
131 # Shouldn't need to prompt the second time
132 sys.stdin = None
133 cli.main(['--import', 'Hello'])
134 finally:
135 sys.stdout = old_out
137 def testAcceptKey(self):
138 old_out = sys.stdout
139 try:
140 sys.stdout = StringIO()
141 self.child = server.handle_requests('Hello', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
142 policy = autopolicy.AutoPolicy('http://localhost:8000/Hello', download_only = False,
143 handler = DummyHandler())
144 assert policy.need_download()
145 sys.stdin = Reply("Y\n")
146 try:
147 policy.download_and_execute(['Hello'], main = 'Missing')
148 assert 0
149 except model.SafeException, ex:
150 if "HelloWorld/Missing" not in str(ex):
151 raise ex
152 finally:
153 sys.stdout = old_out
155 def testRecipe(self):
156 old_out = sys.stdout
157 try:
158 sys.stdout = StringIO()
159 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'dummy_1-1_all.deb'))
160 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False)
161 try:
162 policy.download_and_execute([])
163 assert False
164 except model.SafeException, ex:
165 if "HelloWorld/Missing" not in str(ex):
166 raise ex
167 finally:
168 sys.stdout = old_out
170 def testSymlink(self):
171 old_out = sys.stdout
172 try:
173 sys.stdout = StringIO()
174 self.child = server.handle_requests(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
175 policy = autopolicy.AutoPolicy(os.path.abspath('RecipeSymlink.xml'), download_only = False,
176 handler = DummyHandler())
177 try:
178 policy.download_and_execute([])
179 assert False
180 except model.SafeException, ex:
181 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
182 raise ex
183 self.assertEquals(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
184 finally:
185 sys.stdout = old_out
187 def testAutopackage(self):
188 old_out = sys.stdout
189 try:
190 sys.stdout = StringIO()
191 self.child = server.handle_requests('HelloWorld.autopackage')
192 policy = autopolicy.AutoPolicy(os.path.abspath('Autopackage.xml'), download_only = False)
193 try:
194 policy.download_and_execute([])
195 assert False
196 except model.SafeException, ex:
197 if "HelloWorld/Missing" not in str(ex):
198 raise
199 finally:
200 sys.stdout = old_out
202 def testRecipeFailure(self):
203 old_out = sys.stdout
204 try:
205 sys.stdout = StringIO()
206 self.child = server.handle_requests('*')
207 policy = autopolicy.AutoPolicy(os.path.abspath('Recipe.xml'), download_only = False,
208 handler = DummyHandler())
209 try:
210 policy.download_and_execute([])
211 assert False
212 except download.DownloadError, ex:
213 if "Connection" not in str(ex):
214 raise
215 finally:
216 sys.stdout = old_out
218 suite = unittest.makeSuite(TestDownload)
219 if __name__ == '__main__':
220 sys.argv.append('-v')
221 unittest.main()