Exceptions raised during renaming in rotating file handlers are now passed to handleE...
[python.git] / Lib / test / test_tarfile.py
blob98349c4ebfd573ee850ef8c4d74266f341dd0d41
1 import sys
2 import os
3 import shutil
4 import tempfile
6 import unittest
7 import tarfile
9 from test import test_support
11 # Check for our compression modules.
12 try:
13 import gzip
14 gzip.GzipFile
15 except (ImportError, AttributeError):
16 gzip = None
17 try:
18 import bz2
19 except ImportError:
20 bz2 = None
22 def path(path):
23 return test_support.findfile(path)
25 testtar = path("testtar.tar")
26 tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
27 tempname = test_support.TESTFN
28 membercount = 10
30 def tarname(comp=""):
31 if not comp:
32 return testtar
33 return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp))
35 def dirname():
36 if not os.path.exists(tempdir):
37 os.mkdir(tempdir)
38 return tempdir
40 def tmpname():
41 return tempname
44 class BaseTest(unittest.TestCase):
45 comp = ''
46 mode = 'r'
47 sep = ':'
49 def setUp(self):
50 mode = self.mode + self.sep + self.comp
51 self.tar = tarfile.open(tarname(self.comp), mode)
53 def tearDown(self):
54 self.tar.close()
56 class ReadTest(BaseTest):
58 def test(self):
59 """Test member extraction.
60 """
61 members = 0
62 for tarinfo in self.tar:
63 members += 1
64 if not tarinfo.isreg():
65 continue
66 f = self.tar.extractfile(tarinfo)
67 self.assert_(len(f.read()) == tarinfo.size,
68 "size read does not match expected size")
69 f.close()
71 self.assert_(members == membercount,
72 "could not find all members")
74 def test_sparse(self):
75 """Test sparse member extraction.
76 """
77 if self.sep != "|":
78 f1 = self.tar.extractfile("S-SPARSE")
79 f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
80 self.assert_(f1.read() == f2.read(),
81 "_FileObject failed on sparse file member")
83 def test_readlines(self):
84 """Test readlines() method of _FileObject.
85 """
86 if self.sep != "|":
87 filename = "0-REGTYPE-TEXT"
88 self.tar.extract(filename, dirname())
89 lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
90 lines2 = self.tar.extractfile(filename).readlines()
91 self.assert_(lines1 == lines2,
92 "_FileObject.readline() does not work correctly")
94 def test_iter(self):
95 # Test iteration over ExFileObject.
96 if self.sep != "|":
97 filename = "0-REGTYPE-TEXT"
98 self.tar.extract(filename, dirname())
99 lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
100 lines2 = [line for line in self.tar.extractfile(filename)]
101 self.assert_(lines1 == lines2,
102 "ExFileObject iteration does not work correctly")
104 def test_seek(self):
105 """Test seek() method of _FileObject, incl. random reading.
107 if self.sep != "|":
108 filename = "0-REGTYPE"
109 self.tar.extract(filename, dirname())
110 data = file(os.path.join(dirname(), filename), "rb").read()
112 tarinfo = self.tar.getmember(filename)
113 fobj = self.tar.extractfile(tarinfo)
115 text = fobj.read()
116 fobj.seek(0)
117 self.assert_(0 == fobj.tell(),
118 "seek() to file's start failed")
119 fobj.seek(2048, 0)
120 self.assert_(2048 == fobj.tell(),
121 "seek() to absolute position failed")
122 fobj.seek(-1024, 1)
123 self.assert_(1024 == fobj.tell(),
124 "seek() to negative relative position failed")
125 fobj.seek(1024, 1)
126 self.assert_(2048 == fobj.tell(),
127 "seek() to positive relative position failed")
128 s = fobj.read(10)
129 self.assert_(s == data[2048:2058],
130 "read() after seek failed")
131 fobj.seek(0, 2)
132 self.assert_(tarinfo.size == fobj.tell(),
133 "seek() to file's end failed")
134 self.assert_(fobj.read() == "",
135 "read() at file's end did not return empty string")
136 fobj.seek(-tarinfo.size, 2)
137 self.assert_(0 == fobj.tell(),
138 "relative seek() to file's start failed")
139 fobj.seek(512)
140 s1 = fobj.readlines()
141 fobj.seek(512)
142 s2 = fobj.readlines()
143 self.assert_(s1 == s2,
144 "readlines() after seek failed")
145 fobj.close()
147 def test_old_dirtype(self):
148 """Test old style dirtype member (bug #1336623).
150 # Old tars create directory members using a REGTYPE
151 # header with a "/" appended to the filename field.
153 # Create an old tar style directory entry.
154 filename = tmpname()
155 tarinfo = tarfile.TarInfo("directory/")
156 tarinfo.type = tarfile.REGTYPE
158 fobj = file(filename, "w")
159 fobj.write(tarinfo.tobuf())
160 fobj.close()
162 try:
163 # Test if it is still a directory entry when
164 # read back.
165 tar = tarfile.open(filename)
166 tarinfo = tar.getmembers()[0]
167 tar.close()
169 self.assert_(tarinfo.type == tarfile.DIRTYPE)
170 self.assert_(tarinfo.name.endswith("/"))
171 finally:
172 try:
173 os.unlink(filename)
174 except:
175 pass
177 class ReadStreamTest(ReadTest):
178 sep = "|"
180 def test(self):
181 """Test member extraction, and for StreamError when
182 seeking backwards.
184 ReadTest.test(self)
185 tarinfo = self.tar.getmembers()[0]
186 f = self.tar.extractfile(tarinfo)
187 self.assertRaises(tarfile.StreamError, f.read)
189 def test_stream(self):
190 """Compare the normal tar and the stream tar.
192 stream = self.tar
193 tar = tarfile.open(tarname(), 'r')
195 while 1:
196 t1 = tar.next()
197 t2 = stream.next()
198 if t1 is None:
199 break
200 self.assert_(t2 is not None, "stream.next() failed.")
202 if t2.islnk() or t2.issym():
203 self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
204 continue
205 v1 = tar.extractfile(t1)
206 v2 = stream.extractfile(t2)
207 if v1 is None:
208 continue
209 self.assert_(v2 is not None, "stream.extractfile() failed")
210 self.assert_(v1.read() == v2.read(), "stream extraction failed")
212 stream.close()
214 class ReadAsteriskTest(ReadTest):
216 def setUp(self):
217 mode = self.mode + self.sep + "*"
218 self.tar = tarfile.open(tarname(self.comp), mode)
220 class ReadStreamAsteriskTest(ReadStreamTest):
222 def setUp(self):
223 mode = self.mode + self.sep + "*"
224 self.tar = tarfile.open(tarname(self.comp), mode)
226 class WriteTest(BaseTest):
227 mode = 'w'
229 def setUp(self):
230 mode = self.mode + self.sep + self.comp
231 self.src = tarfile.open(tarname(self.comp), 'r')
232 self.dstname = tmpname()
233 self.dst = tarfile.open(self.dstname, mode)
235 def tearDown(self):
236 self.src.close()
237 self.dst.close()
239 def test_posix(self):
240 self.dst.posix = 1
241 self._test()
243 def test_nonposix(self):
244 self.dst.posix = 0
245 self._test()
247 def test_small(self):
248 self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1"))
249 self.dst.close()
250 self.assertNotEqual(os.stat(self.dstname).st_size, 0)
252 def _test(self):
253 for tarinfo in self.src:
254 if not tarinfo.isreg():
255 continue
256 f = self.src.extractfile(tarinfo)
257 if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
258 self.assertRaises(ValueError, self.dst.addfile,
259 tarinfo, f)
260 else:
261 self.dst.addfile(tarinfo, f)
263 class WriteSize0Test(BaseTest):
264 mode = 'w'
266 def setUp(self):
267 self.tmpdir = dirname()
268 self.dstname = tmpname()
269 self.dst = tarfile.open(self.dstname, "w")
271 def tearDown(self):
272 self.dst.close()
274 def test_file(self):
275 path = os.path.join(self.tmpdir, "file")
276 file(path, "w")
277 tarinfo = self.dst.gettarinfo(path)
278 self.assertEqual(tarinfo.size, 0)
279 file(path, "w").write("aaa")
280 tarinfo = self.dst.gettarinfo(path)
281 self.assertEqual(tarinfo.size, 3)
283 def test_directory(self):
284 path = os.path.join(self.tmpdir, "directory")
285 os.mkdir(path)
286 tarinfo = self.dst.gettarinfo(path)
287 self.assertEqual(tarinfo.size, 0)
289 def test_symlink(self):
290 if hasattr(os, "symlink"):
291 path = os.path.join(self.tmpdir, "symlink")
292 os.symlink("link_target", path)
293 tarinfo = self.dst.gettarinfo(path)
294 self.assertEqual(tarinfo.size, 0)
297 class WriteStreamTest(WriteTest):
298 sep = '|'
300 class WriteGNULongTest(unittest.TestCase):
301 """This testcase checks for correct creation of GNU Longname
302 and Longlink extensions.
304 It creates a tarfile and adds empty members with either
305 long names, long linknames or both and compares the size
306 of the tarfile with the expected size.
308 It checks for SF bug #812325 in TarFile._create_gnulong().
310 While I was writing this testcase, I noticed a second bug
311 in the same method:
312 Long{names,links} weren't null-terminated which lead to
313 bad tarfiles when their length was a multiple of 512. This
314 is tested as well.
317 def setUp(self):
318 self.tar = tarfile.open(tmpname(), "w")
319 self.tar.posix = False
321 def tearDown(self):
322 self.tar.close()
324 def _length(self, s):
325 blocks, remainder = divmod(len(s) + 1, 512)
326 if remainder:
327 blocks += 1
328 return blocks * 512
330 def _calc_size(self, name, link=None):
331 # initial tar header
332 count = 512
334 if len(name) > tarfile.LENGTH_NAME:
335 # gnu longname extended header + longname
336 count += 512
337 count += self._length(name)
339 if link is not None and len(link) > tarfile.LENGTH_LINK:
340 # gnu longlink extended header + longlink
341 count += 512
342 count += self._length(link)
344 return count
346 def _test(self, name, link=None):
347 tarinfo = tarfile.TarInfo(name)
348 if link:
349 tarinfo.linkname = link
350 tarinfo.type = tarfile.LNKTYPE
352 self.tar.addfile(tarinfo)
354 v1 = self._calc_size(name, link)
355 v2 = self.tar.offset
356 self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
358 def test_longname_1023(self):
359 self._test(("longnam/" * 127) + "longnam")
361 def test_longname_1024(self):
362 self._test(("longnam/" * 127) + "longname")
364 def test_longname_1025(self):
365 self._test(("longnam/" * 127) + "longname_")
367 def test_longlink_1023(self):
368 self._test("name", ("longlnk/" * 127) + "longlnk")
370 def test_longlink_1024(self):
371 self._test("name", ("longlnk/" * 127) + "longlink")
373 def test_longlink_1025(self):
374 self._test("name", ("longlnk/" * 127) + "longlink_")
376 def test_longnamelink_1023(self):
377 self._test(("longnam/" * 127) + "longnam",
378 ("longlnk/" * 127) + "longlnk")
380 def test_longnamelink_1024(self):
381 self._test(("longnam/" * 127) + "longname",
382 ("longlnk/" * 127) + "longlink")
384 def test_longnamelink_1025(self):
385 self._test(("longnam/" * 127) + "longname_",
386 ("longlnk/" * 127) + "longlink_")
388 class ExtractHardlinkTest(BaseTest):
390 def test_hardlink(self):
391 """Test hardlink extraction (bug #857297)
393 # Prevent errors from being caught
394 self.tar.errorlevel = 1
396 self.tar.extract("0-REGTYPE", dirname())
397 try:
398 # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE
399 self.tar.extract("1-LNKTYPE", dirname())
400 except EnvironmentError, e:
401 import errno
402 if e.errno == errno.ENOENT:
403 self.fail("hardlink not extracted properly")
405 class CreateHardlinkTest(BaseTest):
406 """Test the creation of LNKTYPE (hardlink) members in an archive.
407 In this respect tarfile.py mimics the behaviour of GNU tar: If
408 a file has a st_nlink > 1, it will be added a REGTYPE member
409 only the first time.
412 def setUp(self):
413 self.tar = tarfile.open(tmpname(), "w")
415 self.foo = os.path.join(dirname(), "foo")
416 self.bar = os.path.join(dirname(), "bar")
418 if os.path.exists(self.foo):
419 os.remove(self.foo)
420 if os.path.exists(self.bar):
421 os.remove(self.bar)
423 file(self.foo, "w").write("foo")
424 self.tar.add(self.foo)
426 def test_add_twice(self):
427 # If st_nlink == 1 then the same file will be added as
428 # REGTYPE every time.
429 tarinfo = self.tar.gettarinfo(self.foo)
430 self.assertEqual(tarinfo.type, tarfile.REGTYPE,
431 "add file as regular failed")
433 def test_add_hardlink(self):
434 # If st_nlink > 1 then the same file will be added as
435 # LNKTYPE.
436 os.link(self.foo, self.bar)
437 tarinfo = self.tar.gettarinfo(self.foo)
438 self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
439 "add file as hardlink failed")
441 tarinfo = self.tar.gettarinfo(self.bar)
442 self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
443 "add file as hardlink failed")
445 def test_dereference_hardlink(self):
446 self.tar.dereference = True
447 os.link(self.foo, self.bar)
448 tarinfo = self.tar.gettarinfo(self.bar)
449 self.assertEqual(tarinfo.type, tarfile.REGTYPE,
450 "dereferencing hardlink failed")
453 # Gzip TestCases
454 class ReadTestGzip(ReadTest):
455 comp = "gz"
456 class ReadStreamTestGzip(ReadStreamTest):
457 comp = "gz"
458 class WriteTestGzip(WriteTest):
459 comp = "gz"
460 class WriteStreamTestGzip(WriteStreamTest):
461 comp = "gz"
462 class ReadAsteriskTestGzip(ReadAsteriskTest):
463 comp = "gz"
464 class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest):
465 comp = "gz"
467 # Filemode test cases
469 class FileModeTest(unittest.TestCase):
470 def test_modes(self):
471 self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x')
472 self.assertEqual(tarfile.filemode(07111), '---s--s--t')
475 if bz2:
476 # Bzip2 TestCases
477 class ReadTestBzip2(ReadTestGzip):
478 comp = "bz2"
479 class ReadStreamTestBzip2(ReadStreamTestGzip):
480 comp = "bz2"
481 class WriteTestBzip2(WriteTest):
482 comp = "bz2"
483 class WriteStreamTestBzip2(WriteStreamTestGzip):
484 comp = "bz2"
485 class ReadAsteriskTestBzip2(ReadAsteriskTest):
486 comp = "bz2"
487 class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest):
488 comp = "bz2"
490 # If importing gzip failed, discard the Gzip TestCases.
491 if not gzip:
492 del ReadTestGzip
493 del ReadStreamTestGzip
494 del WriteTestGzip
495 del WriteStreamTestGzip
497 def test_main():
498 if gzip:
499 # create testtar.tar.gz
500 gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
501 if bz2:
502 # create testtar.tar.bz2
503 bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
505 tests = [
506 FileModeTest,
507 ReadTest,
508 ReadStreamTest,
509 ReadAsteriskTest,
510 ReadStreamAsteriskTest,
511 WriteTest,
512 WriteSize0Test,
513 WriteStreamTest,
514 WriteGNULongTest,
517 if hasattr(os, "link"):
518 tests.append(ExtractHardlinkTest)
519 tests.append(CreateHardlinkTest)
521 if gzip:
522 tests.extend([
523 ReadTestGzip, ReadStreamTestGzip,
524 WriteTestGzip, WriteStreamTestGzip,
525 ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip
528 if bz2:
529 tests.extend([
530 ReadTestBzip2, ReadStreamTestBzip2,
531 WriteTestBzip2, WriteStreamTestBzip2,
532 ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2
534 try:
535 test_support.run_unittest(*tests)
536 finally:
537 if gzip:
538 os.remove(tarname("gz"))
539 if bz2:
540 os.remove(tarname("bz2"))
541 if os.path.exists(dirname()):
542 shutil.rmtree(dirname())
543 if os.path.exists(tmpname()):
544 os.remove(tmpname())
546 if __name__ == "__main__":
547 test_main()