Generalise mirror fallback code.
[zeroinstall/zeroinstall-mseaborn.git] / tests / teststore.py
blob29f4349b451ff5a9be232f5a8d8fdfdba86918cc
1 #!/usr/bin/env python2.4
2 from basetest import BaseTest
3 import sys, tempfile, os, shutil
4 from StringIO import StringIO
5 import unittest
6 from logging import getLogger, DEBUG, INFO
7 #getLogger().setLevel(DEBUG)
9 sys.path.insert(0, '..')
11 from zeroinstall.zerostore import Store, manifest, BadDigest, cli
12 from zeroinstall import SafeException, support
14 class TestStore(BaseTest):
15 def setUp(self):
16 BaseTest.setUp(self)
18 self.store_parent = tempfile.mktemp()
19 os.mkdir(self.store_parent, 0700)
20 self.store = Store(self.store_parent + '/implementations')
21 os.mkdir(self.store.dir, 0700)
23 self.tmp = tempfile.mktemp()
24 os.mkdir(self.tmp)
26 def tearDown(self):
27 BaseTest.tearDown(self)
29 support.ro_rmtree(self.store_parent)
30 support.ro_rmtree(self.tmp)
32 def testInit(self):
33 assert os.path.isdir(self.store.dir)
34 self.assertEquals([], os.listdir(self.store.dir))
36 def testEmptyManifest(self):
37 lines = list(manifest.generate_manifest(self.tmp))
38 self.assertEquals([], lines)
40 def testSimpleManifest(self):
41 path = os.path.join(self.tmp, 'MyFile')
42 f = file(path, 'w')
43 f.write('Hello')
44 f.close()
45 os.utime(path, (1, 2))
46 lines = list(manifest.generate_manifest(self.tmp))
47 self.assertEquals(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile'],
48 lines)
50 def testLinkManifest(self):
51 path = os.path.join(self.tmp, 'MyLink')
52 os.symlink('Hello', path)
53 lines = list(manifest.generate_manifest(self.tmp))
54 self.assertEquals(['S f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 5 MyLink'],
55 lines)
57 def testVerify(self):
58 path = os.path.join(self.tmp, 'MyLink')
59 os.symlink('Hello', path)
60 mfile = os.path.join(self.tmp, '.manifest')
61 for alg_name in ['sha1', 'sha256', 'sha1new']:
62 try:
63 alg = manifest.get_algorithm(alg_name)
64 added_digest = alg.getID(manifest.add_manifest_file(self.tmp, alg))
65 digest = alg.new_digest()
66 digest.update('Hello')
67 self.assertEquals("S %s 5 MyLink\n" % digest.hexdigest(),
68 file(mfile).read())
69 manifest.verify(self.tmp, added_digest)
70 os.chmod(self.tmp, 0700)
71 os.unlink(mfile)
72 except BadDigest, ex:
73 raise Exception("%s: %s\n%s" % (alg_name, ex, ex.detail))
75 def populate_sample(self, target):
76 """Create a set of files, links and directories in target for testing."""
77 path = os.path.join(target, 'MyFile')
78 f = file(path, 'w')
79 f.write('Hello')
80 f.close()
81 os.utime(path, (1, 2))
83 subdir = os.path.join(target, 'My Dir')
84 os.mkdir(subdir)
86 subfile = os.path.join(subdir, '!a file!')
87 f = file(subfile, 'w')
88 f.write('Some data.')
89 f.close()
90 os.utime(subfile, (1, 2))
92 subfile += '.exe'
93 f = file(subfile, 'w')
94 f.write('Some code.')
95 f.close()
96 os.chmod(subfile, 0500)
97 os.utime(subfile, (1, 2))
99 os.symlink('/the/symlink/target',
100 os.path.join(target, 'a symlink'))
102 def testOptimise(self):
103 sample = os.path.join(self.tmp, 'sample')
104 os.mkdir(sample)
105 self.populate_sample(sample)
106 self.store.add_dir_to_cache('sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a',
107 sample,
108 try_helper = False)
109 subfile = os.path.join(sample, 'My Dir', '!a file!.exe')
110 mtime = os.stat(subfile).st_mtime
111 os.chmod(subfile, 0755)
112 stream = file(subfile, 'w')
113 stream.write('Extra!\n')
114 stream.close()
115 os.utime(subfile, (mtime, mtime))
116 self.store.add_dir_to_cache('sha1new=40861a33dba4e7c26d37505bd9693511808c0c35',
117 sample,
118 try_helper = False)
120 impl_a = self.store.lookup('sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a')
121 impl_b = self.store.lookup('sha1new=40861a33dba4e7c26d37505bd9693511808c0c35')
123 def same_inode(name):
124 info_a = os.lstat(os.path.join(impl_a, name))
125 info_b = os.lstat(os.path.join(impl_b, name))
126 return info_a.st_ino == info_b.st_ino
128 assert not same_inode('My Dir/!a file!')
129 assert not same_inode('My Dir/!a file!.exe')
131 old_stdout = sys.stdout
132 sys.stdout = StringIO()
133 try:
134 cli.do_optimise([self.store.dir])
135 got = sys.stdout.getvalue()
136 finally:
137 sys.stdout = old_stdout
138 assert 'Space freed up : 15 bytes' in got
140 old_stdout = sys.stdout
141 sys.stdout = StringIO()
142 try:
143 cli.do_optimise([self.store.dir])
144 got = sys.stdout.getvalue()
145 finally:
146 sys.stdout = old_stdout
147 assert 'No duplicates found; no changes made.' in got
149 assert same_inode('My Dir/!a file!')
150 assert not same_inode('My Dir/!a file!.exe')
152 def testCopy(self):
153 sha1 = manifest.get_algorithm('sha1')
154 sha1new = manifest.get_algorithm('sha1new')
155 source = os.path.join(self.tmp, 'badname')
156 os.mkdir(source)
158 self.populate_sample(source)
160 lines = list(sha1new.generate_manifest(source))
161 self.assertEquals(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile',
162 'S 570b0ce957ab43e774c82fca0ea3873fc452278b 19 a symlink',
163 'D /My Dir',
164 'F 0236ef92e1e37c57f0eb161e7e2f8b6a8face705 2 10 !a file!',
165 'X b4ab02f2c791596a980fd35f51f5d92ee0b4705c 2 10 !a file!.exe'],
166 lines)
167 digest = sha1.getID(manifest.add_manifest_file(source, sha1))
169 copy = tempfile.mktemp()
170 os.mkdir(copy)
171 try:
172 # Source must be in the form alg=value
173 try:
174 cli.do_copy([source, copy])
175 assert 0
176 except BadDigest, ex:
177 assert 'badname' in str(ex)
178 source, badname = os.path.join(self.tmp, digest), source
179 os.rename(badname, source)
181 # Can't copy sha1 implementations (unsafe)
182 try:
183 cli.do_copy([source, copy])
184 except SafeException, ex:
185 assert 'sha1' in str(ex)
187 # Already have a .manifest
188 try:
189 manifest.add_manifest_file(source, sha1new)
190 assert 0
191 except SafeException, ex:
192 assert '.manifest' in str(ex)
194 os.chmod(source, 0700)
195 os.unlink(os.path.join(source, '.manifest'))
197 # Switch to sha1new
198 digest = sha1new.getID(manifest.add_manifest_file(source, sha1new))
199 source, badname = os.path.join(self.tmp, digest), source
200 os.rename(badname, source)
202 cli.do_copy([source, copy])
204 self.assertEquals('Hello', file(os.path.join(copy, digest, 'MyFile')).read())
205 finally:
206 support.ro_rmtree(copy)
208 suite = unittest.makeSuite(TestStore)
209 if __name__ == '__main__':
210 sys.argv.append('-v')
211 unittest.main()