Fixed handling of missing local feeds
[zeroinstall.git] / tests / teststore.py
blobd9fc65fe95042ac6dedbd575a29cbbff23979420
1 #!/usr/bin/env python
2 from basetest import BaseTest
3 import sys, tempfile, os
4 from StringIO import StringIO
5 import unittest
6 import logging
8 logger = logging.getLogger()
10 sys.path.insert(0, '..')
12 from zeroinstall.zerostore import Store, manifest, BadDigest, cli, NotStored
13 from zeroinstall import SafeException, support
15 mydir = os.path.dirname(__file__)
17 class TestStore(BaseTest):
18 def setUp(self):
19 BaseTest.setUp(self)
21 self.store_parent = tempfile.mktemp()
22 os.mkdir(self.store_parent, 0o700)
23 self.store = Store(self.store_parent + '/implementations')
24 os.mkdir(self.store.dir, 0o700)
26 self.tmp = tempfile.mktemp()
27 os.mkdir(self.tmp)
29 def tearDown(self):
30 BaseTest.tearDown(self)
32 support.ro_rmtree(self.store_parent)
33 support.ro_rmtree(self.tmp)
35 cli.stores = None
37 def testInit(self):
38 assert os.path.isdir(self.store.dir)
39 self.assertEqual([], os.listdir(self.store.dir))
41 def testEmptyManifest(self):
42 lines = list(manifest.generate_manifest(self.tmp))
43 self.assertEqual([], lines)
45 def testSimpleManifest(self):
46 path = os.path.join(self.tmp, 'MyFile')
47 f = open(path, 'w')
48 f.write('Hello')
49 f.close()
50 os.utime(path, (1, 2))
51 lines = list(manifest.generate_manifest(self.tmp))
52 self.assertEqual(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile'],
53 lines)
55 def testLinkManifest(self):
56 path = os.path.join(self.tmp, 'MyLink')
57 os.symlink('Hello', path)
58 lines = list(manifest.generate_manifest(self.tmp))
59 self.assertEqual(['S f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 5 MyLink'],
60 lines)
62 def testVerify(self):
63 path = os.path.join(self.tmp, 'MyLink')
64 os.symlink('Hello', path)
65 mfile = os.path.join(self.tmp, '.manifest')
66 for alg_name in ['sha1', 'sha256', 'sha1new']:
67 try:
68 alg = manifest.get_algorithm(alg_name)
69 added_digest = alg.getID(manifest.add_manifest_file(self.tmp, alg))
70 digest = alg.new_digest()
71 digest.update('Hello')
72 self.assertEqual("S %s 5 MyLink\n" % digest.hexdigest(),
73 open(mfile, 'rb').read())
74 manifest.verify(self.tmp, added_digest)
75 os.chmod(self.tmp, 0o700)
76 os.unlink(mfile)
77 except BadDigest as ex:
78 raise Exception("%s: %s\n%s" % (alg_name, ex, ex.detail))
80 def populate_sample(self, target):
81 """Create a set of files, links and directories in target for testing."""
82 path = os.path.join(target, 'MyFile')
83 f = open(path, 'w')
84 f.write('Hello')
85 f.close()
86 os.utime(path, (1, 2))
88 subdir = os.path.join(target, 'My Dir')
89 os.mkdir(subdir)
91 subfile = os.path.join(subdir, '!a file!')
92 f = open(subfile, 'w')
93 f.write('Some data.')
94 f.close()
95 os.utime(subfile, (1, 2))
97 subfile += '.exe'
98 f = open(subfile, 'w')
99 f.write('Some code.')
100 f.close()
101 os.chmod(subfile, 0o500)
102 os.utime(subfile, (1, 2))
104 os.symlink('/the/symlink/target',
105 os.path.join(target, 'a symlink'))
107 def testAdd(self):
108 sample = os.path.join(self.tmp, 'sample')
109 os.mkdir(sample)
110 self.populate_sample(sample)
111 cli.init_stores()
112 digest = 'sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a'
113 try:
114 cli.stores.lookup(digest)
115 assert False
116 except NotStored:
117 pass
119 logger.setLevel(logging.ERROR)
120 try:
121 cli.do_add([digest + "b", sample])
122 assert False
123 except BadDigest:
124 pass
125 logger.setLevel(logging.WARN)
127 old_stdout = sys.stdout
129 cli.do_add([digest, sample])
130 sys.stdout = StringIO()
131 try:
132 cli.do_find([digest])
133 assert False
134 except SystemExit as ex:
135 assert ex.code == 0
136 cached = sys.stdout.getvalue().strip()
137 assert cached == cli.stores.lookup(digest)
139 for alg in [[], ['sha1new']]:
140 sys.stdout = StringIO()
141 try:
142 cli.do_manifest([cached] + alg)
143 assert False
144 except SystemExit as ex:
145 assert ex.code == 0
146 result = sys.stdout.getvalue()
147 assert 'MyFile' in result
148 assert result.split('\n')[-2] == digest
150 # Verify...
151 sys.stdout = StringIO()
152 cli.do_verify([cached, digest])
153 cli.do_verify([cached])
154 cli.do_verify([digest])
156 # Full audit
157 cli.do_audit([os.path.dirname(cached)])
159 # Corrupt it...
160 os.chmod(cached, 0o700)
161 open(os.path.join(cached, 'hacked'), 'w').close()
163 # Verify again...
164 sys.stdout = StringIO()
165 try:
166 cli.do_verify([cached, digest])
167 assert False
168 except SystemExit as ex:
169 assert ex.code == 1
170 result = sys.stdout.getvalue()
171 sys.stdout = old_stdout
172 assert 'Cached item does NOT verify' in result
174 # Full audit
175 sys.stdout = StringIO()
176 try:
177 cli.do_audit([os.path.dirname(cached)])
178 except SystemExit as ex:
179 assert ex.code == 1
180 result = sys.stdout.getvalue()
181 sys.stdout = old_stdout
182 assert 'Cached item does NOT verify' in result
184 def testList(self):
185 cli.init_stores()
187 old_stdout = sys.stdout
188 sys.stdout = StringIO()
189 cli.do_list([])
190 result = sys.stdout.getvalue()
191 assert 'User store' in result
193 sys.stdout = old_stdout
195 def testAddArchive(self):
196 cli.init_stores()
197 digest = 'sha1new=290eb133e146635fe37713fd58174324a16d595f'
199 try:
200 cli.stores.lookup(digest)
201 assert False
202 except NotStored:
203 pass
205 cli.do_add([digest, os.path.join(mydir, 'HelloWorld.tgz')])
206 cli.do_add([digest, os.path.join(mydir, 'HelloWorld.tgz')])
207 cli.stores.lookup(digest)
209 def testOptimise(self):
210 sample = os.path.join(self.tmp, 'sample')
211 os.mkdir(sample)
212 self.populate_sample(sample)
213 self.store.add_dir_to_cache('sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a',
214 sample,
215 try_helper = False)
216 subfile = os.path.join(sample, 'My Dir', '!a file!.exe')
217 mtime = os.stat(subfile).st_mtime
218 os.chmod(subfile, 0o755)
219 stream = open(subfile, 'w')
220 stream.write('Extra!\n')
221 stream.close()
222 os.utime(subfile, (mtime, mtime))
223 self.store.add_dir_to_cache('sha1new=40861a33dba4e7c26d37505bd9693511808c0c35',
224 sample,
225 try_helper = False)
227 impl_a = self.store.lookup('sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a')
228 impl_b = self.store.lookup('sha1new=40861a33dba4e7c26d37505bd9693511808c0c35')
230 def same_inode(name):
231 info_a = os.lstat(os.path.join(impl_a, name))
232 info_b = os.lstat(os.path.join(impl_b, name))
233 return info_a.st_ino == info_b.st_ino
235 assert not same_inode('My Dir/!a file!')
236 assert not same_inode('My Dir/!a file!.exe')
238 old_stdout = sys.stdout
239 sys.stdout = StringIO()
240 try:
241 cli.do_optimise([self.store.dir])
242 got = sys.stdout.getvalue()
243 finally:
244 sys.stdout = old_stdout
245 assert 'Space freed up : 15 bytes' in got
247 old_stdout = sys.stdout
248 sys.stdout = StringIO()
249 try:
250 cli.do_optimise([self.store.dir])
251 got = sys.stdout.getvalue()
252 finally:
253 sys.stdout = old_stdout
254 assert 'No duplicates found; no changes made.' in got
256 assert same_inode('My Dir/!a file!')
257 assert not same_inode('My Dir/!a file!.exe')
259 def testCopy(self):
260 sha1 = manifest.get_algorithm('sha1')
261 sha1new = manifest.get_algorithm('sha1new')
262 source = os.path.join(self.tmp, 'badname')
263 os.mkdir(source)
265 self.populate_sample(source)
267 lines = list(sha1new.generate_manifest(source))
268 self.assertEqual(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile',
269 'S 570b0ce957ab43e774c82fca0ea3873fc452278b 19 a symlink',
270 'D /My Dir',
271 'F 0236ef92e1e37c57f0eb161e7e2f8b6a8face705 2 10 !a file!',
272 'X b4ab02f2c791596a980fd35f51f5d92ee0b4705c 2 10 !a file!.exe'],
273 lines)
274 digest = sha1.getID(manifest.add_manifest_file(source, sha1))
276 copy = tempfile.mktemp()
277 os.mkdir(copy)
278 try:
279 # Source must be in the form alg=value
280 try:
281 cli.do_copy([source, copy])
282 assert 0
283 except BadDigest as ex:
284 assert 'badname' in str(ex)
285 source, badname = os.path.join(self.tmp, digest), source
286 os.chmod(badname, 0o755) # can't rename RO directories on MacOS X
287 os.rename(badname, source)
288 os.chmod(source, 0o555)
290 # Can't copy sha1 implementations (unsafe)
291 try:
292 cli.do_copy([source, copy])
293 except SafeException as ex:
294 assert 'sha1' in str(ex)
296 # Already have a .manifest
297 try:
298 manifest.add_manifest_file(source, sha1new)
299 assert 0
300 except SafeException as ex:
301 assert '.manifest' in str(ex)
303 os.chmod(source, 0o700)
304 os.unlink(os.path.join(source, '.manifest'))
306 # Switch to sha1new
307 digest = sha1new.getID(manifest.add_manifest_file(source, sha1new))
308 source, badname = os.path.join(self.tmp, digest), source
309 os.chmod(badname, 0o755)
310 os.rename(badname, source)
311 os.chmod(source, 0o555)
313 cli.do_copy([source, copy])
315 self.assertEqual('Hello', open(os.path.join(copy, digest, 'MyFile')).read())
316 finally:
317 support.ro_rmtree(copy)
319 if __name__ == '__main__':
320 unittest.main()