2 from basetest
import BaseTest
3 import sys
, tempfile
, os
4 from StringIO
import StringIO
8 logger
= logging
.getLogger()
10 sys
.path
.insert(0, '..')
12 from zeroinstall
.zerostore
import Store
, manifest
, BadDigest
, cli
, NotStored
13 from zeroinstall
import SafeException
, support
15 mydir
= os
.path
.dirname(__file__
)
17 class TestStore(BaseTest
):
21 self
.store_parent
= tempfile
.mktemp()
22 os
.mkdir(self
.store_parent
, 0o700)
23 self
.store
= Store(self
.store_parent
+ '/implementations')
24 os
.mkdir(self
.store
.dir, 0o700)
26 self
.tmp
= tempfile
.mktemp()
30 BaseTest
.tearDown(self
)
32 support
.ro_rmtree(self
.store_parent
)
33 support
.ro_rmtree(self
.tmp
)
38 assert os
.path
.isdir(self
.store
.dir)
39 self
.assertEqual([], os
.listdir(self
.store
.dir))
41 def testEmptyManifest(self
):
42 lines
= list(manifest
.generate_manifest(self
.tmp
))
43 self
.assertEqual([], lines
)
45 def testSimpleManifest(self
):
46 path
= os
.path
.join(self
.tmp
, 'MyFile')
50 os
.utime(path
, (1, 2))
51 lines
= list(manifest
.generate_manifest(self
.tmp
))
52 self
.assertEqual(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile'],
55 def testLinkManifest(self
):
56 path
= os
.path
.join(self
.tmp
, 'MyLink')
57 os
.symlink('Hello', path
)
58 lines
= list(manifest
.generate_manifest(self
.tmp
))
59 self
.assertEqual(['S f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 5 MyLink'],
63 path
= os
.path
.join(self
.tmp
, 'MyLink')
64 os
.symlink('Hello', path
)
65 mfile
= os
.path
.join(self
.tmp
, '.manifest')
66 for alg_name
in ['sha1', 'sha256', 'sha1new']:
68 alg
= manifest
.get_algorithm(alg_name
)
69 added_digest
= alg
.getID(manifest
.add_manifest_file(self
.tmp
, alg
))
70 digest
= alg
.new_digest()
71 digest
.update('Hello')
72 self
.assertEqual("S %s 5 MyLink\n" % digest
.hexdigest(),
73 open(mfile
, 'rb').read())
74 manifest
.verify(self
.tmp
, added_digest
)
75 os
.chmod(self
.tmp
, 0o700)
77 except BadDigest
as ex
:
78 raise Exception("%s: %s\n%s" % (alg_name
, ex
, ex
.detail
))
80 def populate_sample(self
, target
):
81 """Create a set of files, links and directories in target for testing."""
82 path
= os
.path
.join(target
, 'MyFile')
86 os
.utime(path
, (1, 2))
88 subdir
= os
.path
.join(target
, 'My Dir')
91 subfile
= os
.path
.join(subdir
, '!a file!')
92 f
= open(subfile
, 'w')
95 os
.utime(subfile
, (1, 2))
98 f
= open(subfile
, 'w')
101 os
.chmod(subfile
, 0o500)
102 os
.utime(subfile
, (1, 2))
104 os
.symlink('/the/symlink/target',
105 os
.path
.join(target
, 'a symlink'))
108 sample
= os
.path
.join(self
.tmp
, 'sample')
110 self
.populate_sample(sample
)
112 digest
= 'sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a'
114 cli
.stores
.lookup(digest
)
119 logger
.setLevel(logging
.ERROR
)
121 cli
.do_add([digest
+ "b", sample
])
125 logger
.setLevel(logging
.WARN
)
127 old_stdout
= sys
.stdout
129 cli
.do_add([digest
, sample
])
130 sys
.stdout
= StringIO()
132 cli
.do_find([digest
])
134 except SystemExit as ex
:
136 cached
= sys
.stdout
.getvalue().strip()
137 assert cached
== cli
.stores
.lookup(digest
)
139 for alg
in [[], ['sha1new']]:
140 sys
.stdout
= StringIO()
142 cli
.do_manifest([cached
] + alg
)
144 except SystemExit as ex
:
146 result
= sys
.stdout
.getvalue()
147 assert 'MyFile' in result
148 assert result
.split('\n')[-2] == digest
151 sys
.stdout
= StringIO()
152 cli
.do_verify([cached
, digest
])
153 cli
.do_verify([cached
])
154 cli
.do_verify([digest
])
157 cli
.do_audit([os
.path
.dirname(cached
)])
160 os
.chmod(cached
, 0o700)
161 open(os
.path
.join(cached
, 'hacked'), 'w').close()
164 sys
.stdout
= StringIO()
166 cli
.do_verify([cached
, digest
])
168 except SystemExit as ex
:
170 result
= sys
.stdout
.getvalue()
171 sys
.stdout
= old_stdout
172 assert 'Cached item does NOT verify' in result
175 sys
.stdout
= StringIO()
177 cli
.do_audit([os
.path
.dirname(cached
)])
178 except SystemExit as ex
:
180 result
= sys
.stdout
.getvalue()
181 sys
.stdout
= old_stdout
182 assert 'Cached item does NOT verify' in result
187 old_stdout
= sys
.stdout
188 sys
.stdout
= StringIO()
190 result
= sys
.stdout
.getvalue()
191 assert 'User store' in result
193 sys
.stdout
= old_stdout
195 def testAddArchive(self
):
197 digest
= 'sha1new=290eb133e146635fe37713fd58174324a16d595f'
200 cli
.stores
.lookup(digest
)
205 cli
.do_add([digest
, os
.path
.join(mydir
, 'HelloWorld.tgz')])
206 cli
.do_add([digest
, os
.path
.join(mydir
, 'HelloWorld.tgz')])
207 cli
.stores
.lookup(digest
)
209 def testOptimise(self
):
210 sample
= os
.path
.join(self
.tmp
, 'sample')
212 self
.populate_sample(sample
)
213 self
.store
.add_dir_to_cache('sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a',
216 subfile
= os
.path
.join(sample
, 'My Dir', '!a file!.exe')
217 mtime
= os
.stat(subfile
).st_mtime
218 os
.chmod(subfile
, 0o755)
219 stream
= open(subfile
, 'w')
220 stream
.write('Extra!\n')
222 os
.utime(subfile
, (mtime
, mtime
))
223 self
.store
.add_dir_to_cache('sha1new=40861a33dba4e7c26d37505bd9693511808c0c35',
227 impl_a
= self
.store
.lookup('sha1new=7e3eb25a072988f164bae24d33af69c1814eb99a')
228 impl_b
= self
.store
.lookup('sha1new=40861a33dba4e7c26d37505bd9693511808c0c35')
230 def same_inode(name
):
231 info_a
= os
.lstat(os
.path
.join(impl_a
, name
))
232 info_b
= os
.lstat(os
.path
.join(impl_b
, name
))
233 return info_a
.st_ino
== info_b
.st_ino
235 assert not same_inode('My Dir/!a file!')
236 assert not same_inode('My Dir/!a file!.exe')
238 old_stdout
= sys
.stdout
239 sys
.stdout
= StringIO()
241 cli
.do_optimise([self
.store
.dir])
242 got
= sys
.stdout
.getvalue()
244 sys
.stdout
= old_stdout
245 assert 'Space freed up : 15 bytes' in got
247 old_stdout
= sys
.stdout
248 sys
.stdout
= StringIO()
250 cli
.do_optimise([self
.store
.dir])
251 got
= sys
.stdout
.getvalue()
253 sys
.stdout
= old_stdout
254 assert 'No duplicates found; no changes made.' in got
256 assert same_inode('My Dir/!a file!')
257 assert not same_inode('My Dir/!a file!.exe')
260 sha1
= manifest
.get_algorithm('sha1')
261 sha1new
= manifest
.get_algorithm('sha1new')
262 source
= os
.path
.join(self
.tmp
, 'badname')
265 self
.populate_sample(source
)
267 lines
= list(sha1new
.generate_manifest(source
))
268 self
.assertEqual(['F f7ff9e8b7bb2e09b70935a5d785e0cc5d9d0abf0 2 5 MyFile',
269 'S 570b0ce957ab43e774c82fca0ea3873fc452278b 19 a symlink',
271 'F 0236ef92e1e37c57f0eb161e7e2f8b6a8face705 2 10 !a file!',
272 'X b4ab02f2c791596a980fd35f51f5d92ee0b4705c 2 10 !a file!.exe'],
274 digest
= sha1
.getID(manifest
.add_manifest_file(source
, sha1
))
276 copy
= tempfile
.mktemp()
279 # Source must be in the form alg=value
281 cli
.do_copy([source
, copy
])
283 except BadDigest
as ex
:
284 assert 'badname' in str(ex
)
285 source
, badname
= os
.path
.join(self
.tmp
, digest
), source
286 os
.rename(badname
, source
)
288 # Can't copy sha1 implementations (unsafe)
290 cli
.do_copy([source
, copy
])
291 except SafeException
as ex
:
292 assert 'sha1' in str(ex
)
294 # Already have a .manifest
296 manifest
.add_manifest_file(source
, sha1new
)
298 except SafeException
as ex
:
299 assert '.manifest' in str(ex
)
301 os
.chmod(source
, 0o700)
302 os
.unlink(os
.path
.join(source
, '.manifest'))
305 digest
= sha1new
.getID(manifest
.add_manifest_file(source
, sha1new
))
306 source
, badname
= os
.path
.join(self
.tmp
, digest
), source
307 os
.rename(badname
, source
)
309 cli
.do_copy([source
, copy
])
311 self
.assertEqual('Hello', open(os
.path
.join(copy
, digest
, 'MyFile')).read())
313 support
.ro_rmtree(copy
)
315 if __name__
== '__main__':