1 # -*- coding: iso-8859-15 -*-
8 from hashlib
import md5
14 from test
import test_support
16 # Check for our compression modules.
20 except (ImportError, AttributeError):
28 return md5(data
).hexdigest()
30 TEMPDIR
= os
.path
.abspath(test_support
.TESTFN
)
31 tarname
= test_support
.findfile("testtar.tar")
32 gzipname
= os
.path
.join(TEMPDIR
, "testtar.tar.gz")
33 bz2name
= os
.path
.join(TEMPDIR
, "testtar.tar.bz2")
34 tmpname
= os
.path
.join(TEMPDIR
, "tmp.tar")
36 md5_regtype
= "65f477c818ad9e15f7feab0c6d37742f"
37 md5_sparse
= "a54fbc4ca4f4399a90e1b27164012fc6"
40 class ReadTest(unittest
.TestCase
):
46 self
.tar
= tarfile
.open(self
.tarname
, mode
=self
.mode
, encoding
="iso8859-1")
52 class UstarReadTest(ReadTest
):
54 def test_fileobj_regular_file(self
):
55 tarinfo
= self
.tar
.getmember("ustar/regtype")
56 fobj
= self
.tar
.extractfile(tarinfo
)
58 self
.assertTrue((len(data
), md5sum(data
)) == (tarinfo
.size
, md5_regtype
),
59 "regular file extraction failed")
61 def test_fileobj_readlines(self
):
62 self
.tar
.extract("ustar/regtype", TEMPDIR
)
63 tarinfo
= self
.tar
.getmember("ustar/regtype")
64 fobj1
= open(os
.path
.join(TEMPDIR
, "ustar/regtype"), "rU")
65 fobj2
= self
.tar
.extractfile(tarinfo
)
67 lines1
= fobj1
.readlines()
68 lines2
= fobj2
.readlines()
69 self
.assertTrue(lines1
== lines2
,
70 "fileobj.readlines() failed")
71 self
.assertTrue(len(lines2
) == 114,
72 "fileobj.readlines() failed")
73 self
.assertTrue(lines2
[83] == \
74 "I will gladly admit that Python is not the fastest running scripting language.\n",
75 "fileobj.readlines() failed")
77 def test_fileobj_iter(self
):
78 self
.tar
.extract("ustar/regtype", TEMPDIR
)
79 tarinfo
= self
.tar
.getmember("ustar/regtype")
80 fobj1
= open(os
.path
.join(TEMPDIR
, "ustar/regtype"), "rU")
81 fobj2
= self
.tar
.extractfile(tarinfo
)
82 lines1
= fobj1
.readlines()
83 lines2
= [line
for line
in fobj2
]
84 self
.assertTrue(lines1
== lines2
,
85 "fileobj.__iter__() failed")
87 def test_fileobj_seek(self
):
88 self
.tar
.extract("ustar/regtype", TEMPDIR
)
89 fobj
= open(os
.path
.join(TEMPDIR
, "ustar/regtype"), "rb")
93 tarinfo
= self
.tar
.getmember("ustar/regtype")
94 fobj
= self
.tar
.extractfile(tarinfo
)
98 self
.assertTrue(0 == fobj
.tell(),
99 "seek() to file's start failed")
101 self
.assertTrue(2048 == fobj
.tell(),
102 "seek() to absolute position failed")
104 self
.assertTrue(1024 == fobj
.tell(),
105 "seek() to negative relative position failed")
107 self
.assertTrue(2048 == fobj
.tell(),
108 "seek() to positive relative position failed")
110 self
.assertTrue(s
== data
[2048:2058],
111 "read() after seek failed")
113 self
.assertTrue(tarinfo
.size
== fobj
.tell(),
114 "seek() to file's end failed")
115 self
.assertTrue(fobj
.read() == "",
116 "read() at file's end did not return empty string")
117 fobj
.seek(-tarinfo
.size
, 2)
118 self
.assertTrue(0 == fobj
.tell(),
119 "relative seek() to file's start failed")
121 s1
= fobj
.readlines()
123 s2
= fobj
.readlines()
124 self
.assertTrue(s1
== s2
,
125 "readlines() after seek failed")
127 self
.assertTrue(len(fobj
.readline()) == fobj
.tell(),
128 "tell() after readline() failed")
130 self
.assertTrue(len(fobj
.readline()) + 512 == fobj
.tell(),
131 "tell() after seek() and readline() failed")
133 line
= fobj
.readline()
134 self
.assertTrue(fobj
.read() == data
[len(line
):],
135 "read() after readline() failed")
139 class MiscReadTest(ReadTest
):
141 def test_no_name_argument(self
):
142 fobj
= open(self
.tarname
, "rb")
143 tar
= tarfile
.open(fileobj
=fobj
, mode
=self
.mode
)
144 self
.assertEqual(tar
.name
, os
.path
.abspath(fobj
.name
))
146 def test_no_name_attribute(self
):
147 data
= open(self
.tarname
, "rb").read()
148 fobj
= StringIO
.StringIO(data
)
149 self
.assertRaises(AttributeError, getattr, fobj
, "name")
150 tar
= tarfile
.open(fileobj
=fobj
, mode
=self
.mode
)
151 self
.assertEqual(tar
.name
, None)
153 def test_empty_name_attribute(self
):
154 data
= open(self
.tarname
, "rb").read()
155 fobj
= StringIO
.StringIO(data
)
157 tar
= tarfile
.open(fileobj
=fobj
, mode
=self
.mode
)
158 self
.assertEqual(tar
.name
, None)
160 def test_fileobj_with_offset(self
):
161 # Skip the first member and store values from the second member
163 tar
= tarfile
.open(self
.tarname
, mode
=self
.mode
)
168 data
= tar
.extractfile(t
).read()
171 # Open the testtar and seek to the offset of the second member.
172 if self
.mode
.endswith(":gz"):
173 _open
= gzip
.GzipFile
174 elif self
.mode
.endswith(":bz2"):
178 fobj
= _open(self
.tarname
, "rb")
181 # Test if the tarfile starts with the second member.
182 tar
= tar
.open(self
.tarname
, mode
="r:", fileobj
=fobj
)
184 self
.assertEqual(t
.name
, name
)
185 # Read to the end of fileobj and test if seeking back to the
188 self
.assertEqual(tar
.extractfile(t
).read(), data
,
189 "seek back did not work")
192 def test_fail_comp(self
):
193 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
194 if self
.mode
== "r:":
196 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tarname
, self
.mode
)
197 fobj
= open(tarname
, "rb")
198 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, fileobj
=fobj
, mode
=self
.mode
)
200 def test_v7_dirtype(self
):
201 # Test old style dirtype member (bug #1336623):
202 # Old V7 tars create directory members using an AREGTYPE
203 # header with a "/" appended to the filename field.
204 tarinfo
= self
.tar
.getmember("misc/dirtype-old-v7")
205 self
.assertTrue(tarinfo
.type == tarfile
.DIRTYPE
,
208 def test_xstar_type(self
):
209 # The xstar format stores extra atime and ctime fields inside the
210 # space reserved for the prefix field. The prefix field must be
211 # ignored in this case, otherwise it will mess up the name.
213 self
.tar
.getmember("misc/regtype-xstar")
215 self
.fail("failed to find misc/regtype-xstar (mangled prefix?)")
217 def test_check_members(self
):
218 for tarinfo
in self
.tar
:
219 self
.assertTrue(int(tarinfo
.mtime
) == 07606136617,
220 "wrong mtime for %s" % tarinfo
.name
)
221 if not tarinfo
.name
.startswith("ustar/"):
223 self
.assertTrue(tarinfo
.uname
== "tarfile",
224 "wrong uname for %s" % tarinfo
.name
)
226 def test_find_members(self
):
227 self
.assertTrue(self
.tar
.getmembers()[-1].name
== "misc/eof",
228 "could not find all members")
230 def test_extract_hardlink(self
):
231 # Test hardlink extraction (e.g. bug #857297).
232 tar
= tarfile
.open(tarname
, errorlevel
=1, encoding
="iso8859-1")
234 tar
.extract("ustar/regtype", TEMPDIR
)
236 tar
.extract("ustar/lnktype", TEMPDIR
)
237 except EnvironmentError, e
:
238 if e
.errno
== errno
.ENOENT
:
239 self
.fail("hardlink not extracted properly")
241 data
= open(os
.path
.join(TEMPDIR
, "ustar/lnktype"), "rb").read()
242 self
.assertEqual(md5sum(data
), md5_regtype
)
245 tar
.extract("ustar/symtype", TEMPDIR
)
246 except EnvironmentError, e
:
247 if e
.errno
== errno
.ENOENT
:
248 self
.fail("symlink not extracted properly")
250 data
= open(os
.path
.join(TEMPDIR
, "ustar/symtype"), "rb").read()
251 self
.assertEqual(md5sum(data
), md5_regtype
)
253 def test_extractall(self
):
254 # Test if extractall() correctly restores directory permissions
255 # and times (see issue1735).
256 tar
= tarfile
.open(tarname
, encoding
="iso8859-1")
257 directories
= [t
for t
in tar
if t
.isdir()]
258 tar
.extractall(TEMPDIR
, directories
)
259 for tarinfo
in directories
:
260 path
= os
.path
.join(TEMPDIR
, tarinfo
.name
)
261 if sys
.platform
!= "win32":
262 # Win32 has no support for fine grained permissions.
263 self
.assertEqual(tarinfo
.mode
& 0777, os
.stat(path
).st_mode
& 0777)
264 self
.assertEqual(tarinfo
.mtime
, os
.path
.getmtime(path
))
268 class StreamReadTest(ReadTest
):
272 def test_fileobj_regular_file(self
):
273 tarinfo
= self
.tar
.next() # get "regtype" (can't use getmember)
274 fobj
= self
.tar
.extractfile(tarinfo
)
276 self
.assertTrue((len(data
), md5sum(data
)) == (tarinfo
.size
, md5_regtype
),
277 "regular file extraction failed")
279 def test_provoke_stream_error(self
):
280 tarinfos
= self
.tar
.getmembers()
281 f
= self
.tar
.extractfile(tarinfos
[0]) # read the first member
282 self
.assertRaises(tarfile
.StreamError
, f
.read
)
284 def test_compare_members(self
):
285 tar1
= tarfile
.open(tarname
, encoding
="iso8859-1")
293 self
.assertTrue(t2
is not None, "stream.next() failed.")
295 if t2
.islnk() or t2
.issym():
296 self
.assertRaises(tarfile
.StreamError
, tar2
.extractfile
, t2
)
299 v1
= tar1
.extractfile(t1
)
300 v2
= tar2
.extractfile(t2
)
303 self
.assertTrue(v2
is not None, "stream.extractfile() failed")
304 self
.assertTrue(v1
.read() == v2
.read(), "stream extraction failed")
309 class DetectReadTest(unittest
.TestCase
):
311 def _testfunc_file(self
, name
, mode
):
313 tarfile
.open(name
, mode
)
314 except tarfile
.ReadError
:
317 def _testfunc_fileobj(self
, name
, mode
):
319 tarfile
.open(name
, mode
, fileobj
=open(name
, "rb"))
320 except tarfile
.ReadError
:
323 def _test_modes(self
, testfunc
):
324 testfunc(tarname
, "r")
325 testfunc(tarname
, "r:")
326 testfunc(tarname
, "r:*")
327 testfunc(tarname
, "r|")
328 testfunc(tarname
, "r|*")
331 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tarname
, mode
="r:gz")
332 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tarname
, mode
="r|gz")
333 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, gzipname
, mode
="r:")
334 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, gzipname
, mode
="r|")
336 testfunc(gzipname
, "r")
337 testfunc(gzipname
, "r:*")
338 testfunc(gzipname
, "r:gz")
339 testfunc(gzipname
, "r|*")
340 testfunc(gzipname
, "r|gz")
343 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tarname
, mode
="r:bz2")
344 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tarname
, mode
="r|bz2")
345 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, bz2name
, mode
="r:")
346 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, bz2name
, mode
="r|")
348 testfunc(bz2name
, "r")
349 testfunc(bz2name
, "r:*")
350 testfunc(bz2name
, "r:bz2")
351 testfunc(bz2name
, "r|*")
352 testfunc(bz2name
, "r|bz2")
354 def test_detect_file(self
):
355 self
._test
_modes
(self
._testfunc
_file
)
357 def test_detect_fileobj(self
):
358 self
._test
_modes
(self
._testfunc
_fileobj
)
361 class MemberReadTest(ReadTest
):
363 def _test_member(self
, tarinfo
, chksum
=None, **kwargs
):
364 if chksum
is not None:
365 self
.assertTrue(md5sum(self
.tar
.extractfile(tarinfo
).read()) == chksum
,
366 "wrong md5sum for %s" % tarinfo
.name
)
368 kwargs
["mtime"] = 07606136617
371 if "old-v7" not in tarinfo
.name
:
372 # V7 tar can't handle alphabetic owners.
373 kwargs
["uname"] = "tarfile"
374 kwargs
["gname"] = "tarfile"
375 for k
, v
in kwargs
.iteritems():
376 self
.assertTrue(getattr(tarinfo
, k
) == v
,
377 "wrong value in %s field of %s" % (k
, tarinfo
.name
))
379 def test_find_regtype(self
):
380 tarinfo
= self
.tar
.getmember("ustar/regtype")
381 self
._test
_member
(tarinfo
, size
=7011, chksum
=md5_regtype
)
383 def test_find_conttype(self
):
384 tarinfo
= self
.tar
.getmember("ustar/conttype")
385 self
._test
_member
(tarinfo
, size
=7011, chksum
=md5_regtype
)
387 def test_find_dirtype(self
):
388 tarinfo
= self
.tar
.getmember("ustar/dirtype")
389 self
._test
_member
(tarinfo
, size
=0)
391 def test_find_dirtype_with_size(self
):
392 tarinfo
= self
.tar
.getmember("ustar/dirtype-with-size")
393 self
._test
_member
(tarinfo
, size
=255)
395 def test_find_lnktype(self
):
396 tarinfo
= self
.tar
.getmember("ustar/lnktype")
397 self
._test
_member
(tarinfo
, size
=0, linkname
="ustar/regtype")
399 def test_find_symtype(self
):
400 tarinfo
= self
.tar
.getmember("ustar/symtype")
401 self
._test
_member
(tarinfo
, size
=0, linkname
="regtype")
403 def test_find_blktype(self
):
404 tarinfo
= self
.tar
.getmember("ustar/blktype")
405 self
._test
_member
(tarinfo
, size
=0, devmajor
=3, devminor
=0)
407 def test_find_chrtype(self
):
408 tarinfo
= self
.tar
.getmember("ustar/chrtype")
409 self
._test
_member
(tarinfo
, size
=0, devmajor
=1, devminor
=3)
411 def test_find_fifotype(self
):
412 tarinfo
= self
.tar
.getmember("ustar/fifotype")
413 self
._test
_member
(tarinfo
, size
=0)
415 def test_find_sparse(self
):
416 tarinfo
= self
.tar
.getmember("ustar/sparse")
417 self
._test
_member
(tarinfo
, size
=86016, chksum
=md5_sparse
)
419 def test_find_umlauts(self
):
420 tarinfo
= self
.tar
.getmember("ustar/umlauts-ÄÖÜäöüß")
421 self
._test
_member
(tarinfo
, size
=7011, chksum
=md5_regtype
)
423 def test_find_ustar_longname(self
):
424 name
= "ustar/" + "12345/" * 39 + "1234567/longname"
425 self
.assertTrue(name
in self
.tar
.getnames())
427 def test_find_regtype_oldv7(self
):
428 tarinfo
= self
.tar
.getmember("misc/regtype-old-v7")
429 self
._test
_member
(tarinfo
, size
=7011, chksum
=md5_regtype
)
431 def test_find_pax_umlauts(self
):
432 self
.tar
= tarfile
.open(self
.tarname
, mode
=self
.mode
, encoding
="iso8859-1")
433 tarinfo
= self
.tar
.getmember("pax/umlauts-ÄÖÜäöüß")
434 self
._test
_member
(tarinfo
, size
=7011, chksum
=md5_regtype
)
437 class LongnameTest(ReadTest
):
439 def test_read_longname(self
):
440 # Test reading of longname (bug #1471427).
441 longname
= self
.subdir
+ "/" + "123/" * 125 + "longname"
443 tarinfo
= self
.tar
.getmember(longname
)
445 self
.fail("longname not found")
446 self
.assertTrue(tarinfo
.type != tarfile
.DIRTYPE
, "read longname as dirtype")
448 def test_read_longlink(self
):
449 longname
= self
.subdir
+ "/" + "123/" * 125 + "longname"
450 longlink
= self
.subdir
+ "/" + "123/" * 125 + "longlink"
452 tarinfo
= self
.tar
.getmember(longlink
)
454 self
.fail("longlink not found")
455 self
.assertTrue(tarinfo
.linkname
== longname
, "linkname wrong")
457 def test_truncated_longname(self
):
458 longname
= self
.subdir
+ "/" + "123/" * 125 + "longname"
459 tarinfo
= self
.tar
.getmember(longname
)
460 offset
= tarinfo
.offset
461 self
.tar
.fileobj
.seek(offset
)
462 fobj
= StringIO
.StringIO(self
.tar
.fileobj
.read(3 * 512))
463 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, name
="foo.tar", fileobj
=fobj
)
465 def test_header_offset(self
):
466 # Test if the start offset of the TarInfo object includes
467 # the preceding extended header.
468 longname
= self
.subdir
+ "/" + "123/" * 125 + "longname"
469 offset
= self
.tar
.getmember(longname
).offset
472 tarinfo
= tarfile
.TarInfo
.frombuf(fobj
.read(512))
473 self
.assertEqual(tarinfo
.type, self
.longnametype
)
476 class GNUReadTest(LongnameTest
):
479 longnametype
= tarfile
.GNUTYPE_LONGNAME
481 def test_sparse_file(self
):
482 tarinfo1
= self
.tar
.getmember("ustar/sparse")
483 fobj1
= self
.tar
.extractfile(tarinfo1
)
484 tarinfo2
= self
.tar
.getmember("gnu/sparse")
485 fobj2
= self
.tar
.extractfile(tarinfo2
)
486 self
.assertTrue(fobj1
.read() == fobj2
.read(),
487 "sparse file extraction failed")
490 class PaxReadTest(LongnameTest
):
493 longnametype
= tarfile
.XHDTYPE
495 def test_pax_global_headers(self
):
496 tar
= tarfile
.open(tarname
, encoding
="iso8859-1")
498 tarinfo
= tar
.getmember("pax/regtype1")
499 self
.assertEqual(tarinfo
.uname
, "foo")
500 self
.assertEqual(tarinfo
.gname
, "bar")
501 self
.assertEqual(tarinfo
.pax_headers
.get("VENDOR.umlauts"), u
"ÄÖÜäöüß")
503 tarinfo
= tar
.getmember("pax/regtype2")
504 self
.assertEqual(tarinfo
.uname
, "")
505 self
.assertEqual(tarinfo
.gname
, "bar")
506 self
.assertEqual(tarinfo
.pax_headers
.get("VENDOR.umlauts"), u
"ÄÖÜäöüß")
508 tarinfo
= tar
.getmember("pax/regtype3")
509 self
.assertEqual(tarinfo
.uname
, "tarfile")
510 self
.assertEqual(tarinfo
.gname
, "tarfile")
511 self
.assertEqual(tarinfo
.pax_headers
.get("VENDOR.umlauts"), u
"ÄÖÜäöüß")
513 def test_pax_number_fields(self
):
514 # All following number fields are read from the pax header.
515 tar
= tarfile
.open(tarname
, encoding
="iso8859-1")
516 tarinfo
= tar
.getmember("pax/regtype4")
517 self
.assertEqual(tarinfo
.size
, 7011)
518 self
.assertEqual(tarinfo
.uid
, 123)
519 self
.assertEqual(tarinfo
.gid
, 123)
520 self
.assertEqual(tarinfo
.mtime
, 1041808783.0)
521 self
.assertEqual(type(tarinfo
.mtime
), float)
522 self
.assertEqual(float(tarinfo
.pax_headers
["atime"]), 1041808783.0)
523 self
.assertEqual(float(tarinfo
.pax_headers
["ctime"]), 1041808783.0)
526 class WriteTestBase(unittest
.TestCase
):
527 # Put all write tests in here that are supposed to be tested
528 # in all possible mode combinations.
530 def test_fileobj_no_close(self
):
531 fobj
= StringIO
.StringIO()
532 tar
= tarfile
.open(fileobj
=fobj
, mode
=self
.mode
)
533 tar
.addfile(tarfile
.TarInfo("foo"))
535 self
.assertTrue(fobj
.closed
is False, "external fileobjs must never closed")
538 class WriteTest(WriteTestBase
):
542 def test_100_char_name(self
):
543 # The name field in a tar header stores strings of at most 100 chars.
544 # If a string is shorter than 100 chars it has to be padded with '\0',
545 # which implies that a string of exactly 100 chars is stored without
547 name
= "0123456789" * 10
548 tar
= tarfile
.open(tmpname
, self
.mode
)
549 t
= tarfile
.TarInfo(name
)
553 tar
= tarfile
.open(tmpname
)
554 self
.assertTrue(tar
.getnames()[0] == name
,
555 "failed to store 100 char filename")
558 def test_tar_size(self
):
559 # Test for bug #1013882.
560 tar
= tarfile
.open(tmpname
, self
.mode
)
561 path
= os
.path
.join(TEMPDIR
, "file")
562 fobj
= open(path
, "wb")
567 self
.assertTrue(os
.path
.getsize(tmpname
) > 0,
570 # The test_*_size tests test for bug #1167128.
571 def test_file_size(self
):
572 tar
= tarfile
.open(tmpname
, self
.mode
)
574 path
= os
.path
.join(TEMPDIR
, "file")
575 fobj
= open(path
, "wb")
577 tarinfo
= tar
.gettarinfo(path
)
578 self
.assertEqual(tarinfo
.size
, 0)
580 fobj
= open(path
, "wb")
583 tarinfo
= tar
.gettarinfo(path
)
584 self
.assertEqual(tarinfo
.size
, 3)
588 def test_directory_size(self
):
589 path
= os
.path
.join(TEMPDIR
, "directory")
592 tar
= tarfile
.open(tmpname
, self
.mode
)
593 tarinfo
= tar
.gettarinfo(path
)
594 self
.assertEqual(tarinfo
.size
, 0)
598 def test_link_size(self
):
599 if hasattr(os
, "link"):
600 link
= os
.path
.join(TEMPDIR
, "link")
601 target
= os
.path
.join(TEMPDIR
, "link_target")
602 open(target
, "wb").close()
603 os
.link(target
, link
)
605 tar
= tarfile
.open(tmpname
, self
.mode
)
606 tarinfo
= tar
.gettarinfo(link
)
607 self
.assertEqual(tarinfo
.size
, 0)
612 def test_symlink_size(self
):
613 if hasattr(os
, "symlink"):
614 path
= os
.path
.join(TEMPDIR
, "symlink")
615 os
.symlink("link_target", path
)
617 tar
= tarfile
.open(tmpname
, self
.mode
)
618 tarinfo
= tar
.gettarinfo(path
)
619 self
.assertEqual(tarinfo
.size
, 0)
623 def test_add_self(self
):
625 dstname
= os
.path
.abspath(tmpname
)
627 tar
= tarfile
.open(tmpname
, self
.mode
)
628 self
.assertTrue(tar
.name
== dstname
, "archive name must be absolute")
631 self
.assertTrue(tar
.getnames() == [], "added the archive to itself")
637 self
.assertTrue(tar
.getnames() == [], "added the archive to itself")
639 def test_exclude(self
):
640 tempdir
= os
.path
.join(TEMPDIR
, "exclude")
643 for name
in ("foo", "bar", "baz"):
644 name
= os
.path
.join(tempdir
, name
)
645 open(name
, "wb").close()
648 return os
.path
.isfile(name
)
650 tar
= tarfile
.open(tmpname
, self
.mode
, encoding
="iso8859-1")
651 tar
.add(tempdir
, arcname
="empty_dir", exclude
=exclude
)
654 tar
= tarfile
.open(tmpname
, "r")
655 self
.assertEqual(len(tar
.getmembers()), 1)
656 self
.assertEqual(tar
.getnames()[0], "empty_dir")
658 shutil
.rmtree(tempdir
)
660 def test_filter(self
):
661 tempdir
= os
.path
.join(TEMPDIR
, "filter")
664 for name
in ("foo", "bar", "baz"):
665 name
= os
.path
.join(tempdir
, name
)
666 open(name
, "wb").close()
669 if os
.path
.basename(tarinfo
.name
) == "bar":
672 tarinfo
.uname
= "foo"
675 tar
= tarfile
.open(tmpname
, self
.mode
, encoding
="iso8859-1")
676 tar
.add(tempdir
, arcname
="empty_dir", filter=filter)
679 tar
= tarfile
.open(tmpname
, "r")
681 self
.assertEqual(tarinfo
.uid
, 123)
682 self
.assertEqual(tarinfo
.uname
, "foo")
683 self
.assertEqual(len(tar
.getmembers()), 3)
686 shutil
.rmtree(tempdir
)
688 # Guarantee that stored pathnames are not modified. Don't
689 # remove ./ or ../ or double slashes. Still make absolute
690 # pathnames relative.
691 # For details see bug #6054.
692 def _test_pathname(self
, path
, cmp_path
=None, dir=False):
693 # Create a tarfile with an empty member named path
694 # and compare the stored name with the original.
695 foo
= os
.path
.join(TEMPDIR
, "foo")
697 open(foo
, "w").close()
701 tar
= tarfile
.open(tmpname
, self
.mode
)
702 tar
.add(foo
, arcname
=path
)
705 tar
= tarfile
.open(tmpname
, "r")
714 self
.assertEqual(t
.name
, cmp_path
or path
.replace(os
.sep
, "/"))
716 def test_pathnames(self
):
717 self
._test
_pathname
("foo")
718 self
._test
_pathname
(os
.path
.join("foo", ".", "bar"))
719 self
._test
_pathname
(os
.path
.join("foo", "..", "bar"))
720 self
._test
_pathname
(os
.path
.join(".", "foo"))
721 self
._test
_pathname
(os
.path
.join(".", "foo", "."))
722 self
._test
_pathname
(os
.path
.join(".", "foo", ".", "bar"))
723 self
._test
_pathname
(os
.path
.join(".", "foo", "..", "bar"))
724 self
._test
_pathname
(os
.path
.join(".", "foo", "..", "bar"))
725 self
._test
_pathname
(os
.path
.join("..", "foo"))
726 self
._test
_pathname
(os
.path
.join("..", "foo", ".."))
727 self
._test
_pathname
(os
.path
.join("..", "foo", ".", "bar"))
728 self
._test
_pathname
(os
.path
.join("..", "foo", "..", "bar"))
730 self
._test
_pathname
("foo" + os
.sep
+ os
.sep
+ "bar")
731 self
._test
_pathname
("foo" + os
.sep
+ os
.sep
, "foo", dir=True)
733 def test_abs_pathnames(self
):
734 if sys
.platform
== "win32":
735 self
._test
_pathname
("C:\\foo", "foo")
737 self
._test
_pathname
("/foo", "foo")
738 self
._test
_pathname
("///foo", "foo")
741 # Test adding the current working directory.
745 open("foo", "w").close()
747 tar
= tarfile
.open(tmpname
, self
.mode
)
751 tar
= tarfile
.open(tmpname
, "r")
753 self
.assert_(t
.name
== "." or t
.name
.startswith("./"))
759 class StreamWriteTest(WriteTestBase
):
763 def test_stream_padding(self
):
764 # Test for bug #1543303.
765 tar
= tarfile
.open(tmpname
, self
.mode
)
768 if self
.mode
.endswith("gz"):
769 fobj
= gzip
.GzipFile(tmpname
)
772 elif self
.mode
.endswith("bz2"):
773 dec
= bz2
.BZ2Decompressor()
774 data
= open(tmpname
, "rb").read()
775 data
= dec
.decompress(data
)
776 self
.assertTrue(len(dec
.unused_data
) == 0,
777 "found trailing data")
779 fobj
= open(tmpname
, "rb")
783 self
.assertTrue(data
.count("\0") == tarfile
.RECORDSIZE
,
784 "incorrect zero padding")
787 class GNUWriteTest(unittest
.TestCase
):
788 # This testcase checks for correct creation of GNU Longname
789 # and Longlink extended headers (cp. bug #812325).
791 def _length(self
, s
):
792 blocks
, remainder
= divmod(len(s
) + 1, 512)
797 def _calc_size(self
, name
, link
=None):
801 if len(name
) > tarfile
.LENGTH_NAME
:
802 # GNU longname extended header + longname
804 count
+= self
._length
(name
)
805 if link
is not None and len(link
) > tarfile
.LENGTH_LINK
:
806 # GNU longlink extended header + longlink
808 count
+= self
._length
(link
)
811 def _test(self
, name
, link
=None):
812 tarinfo
= tarfile
.TarInfo(name
)
814 tarinfo
.linkname
= link
815 tarinfo
.type = tarfile
.LNKTYPE
817 tar
= tarfile
.open(tmpname
, "w")
818 tar
.format
= tarfile
.GNU_FORMAT
821 v1
= self
._calc
_size
(name
, link
)
823 self
.assertTrue(v1
== v2
, "GNU longname/longlink creation failed")
827 tar
= tarfile
.open(tmpname
)
829 self
.assertFalse(member
is None, "unable to read longname member")
830 self
.assertTrue(tarinfo
.name
== member
.name
and \
831 tarinfo
.linkname
== member
.linkname
, \
832 "unable to read longname member")
834 def test_longname_1023(self
):
835 self
._test
(("longnam/" * 127) + "longnam")
837 def test_longname_1024(self
):
838 self
._test
(("longnam/" * 127) + "longname")
840 def test_longname_1025(self
):
841 self
._test
(("longnam/" * 127) + "longname_")
843 def test_longlink_1023(self
):
844 self
._test
("name", ("longlnk/" * 127) + "longlnk")
846 def test_longlink_1024(self
):
847 self
._test
("name", ("longlnk/" * 127) + "longlink")
849 def test_longlink_1025(self
):
850 self
._test
("name", ("longlnk/" * 127) + "longlink_")
852 def test_longnamelink_1023(self
):
853 self
._test
(("longnam/" * 127) + "longnam",
854 ("longlnk/" * 127) + "longlnk")
856 def test_longnamelink_1024(self
):
857 self
._test
(("longnam/" * 127) + "longname",
858 ("longlnk/" * 127) + "longlink")
860 def test_longnamelink_1025(self
):
861 self
._test
(("longnam/" * 127) + "longname_",
862 ("longlnk/" * 127) + "longlink_")
865 class HardlinkTest(unittest
.TestCase
):
866 # Test the creation of LNKTYPE (hardlink) members in an archive.
869 self
.foo
= os
.path
.join(TEMPDIR
, "foo")
870 self
.bar
= os
.path
.join(TEMPDIR
, "bar")
872 fobj
= open(self
.foo
, "wb")
876 os
.link(self
.foo
, self
.bar
)
878 self
.tar
= tarfile
.open(tmpname
, "w")
879 self
.tar
.add(self
.foo
)
886 def test_add_twice(self
):
887 # The same name will be added as a REGTYPE every
888 # time regardless of st_nlink.
889 tarinfo
= self
.tar
.gettarinfo(self
.foo
)
890 self
.assertTrue(tarinfo
.type == tarfile
.REGTYPE
,
891 "add file as regular failed")
893 def test_add_hardlink(self
):
894 tarinfo
= self
.tar
.gettarinfo(self
.bar
)
895 self
.assertTrue(tarinfo
.type == tarfile
.LNKTYPE
,
896 "add file as hardlink failed")
898 def test_dereference_hardlink(self
):
899 self
.tar
.dereference
= True
900 tarinfo
= self
.tar
.gettarinfo(self
.bar
)
901 self
.assertTrue(tarinfo
.type == tarfile
.REGTYPE
,
902 "dereferencing hardlink failed")
905 class PaxWriteTest(GNUWriteTest
):
907 def _test(self
, name
, link
=None):
909 tarinfo
= tarfile
.TarInfo(name
)
911 tarinfo
.linkname
= link
912 tarinfo
.type = tarfile
.LNKTYPE
914 tar
= tarfile
.open(tmpname
, "w", format
=tarfile
.PAX_FORMAT
)
918 tar
= tarfile
.open(tmpname
)
920 l
= tar
.getmembers()[0].linkname
921 self
.assertTrue(link
== l
, "PAX longlink creation failed")
923 n
= tar
.getmembers()[0].name
924 self
.assertTrue(name
== n
, "PAX longname creation failed")
926 def test_pax_global_header(self
):
934 tar
= tarfile
.open(tmpname
, "w", format
=tarfile
.PAX_FORMAT
, \
935 pax_headers
=pax_headers
)
936 tar
.addfile(tarfile
.TarInfo("test"))
939 # Test if the global header was written correctly.
940 tar
= tarfile
.open(tmpname
, encoding
="iso8859-1")
941 self
.assertEqual(tar
.pax_headers
, pax_headers
)
942 self
.assertEqual(tar
.getmembers()[0].pax_headers
, pax_headers
)
944 # Test if all the fields are unicode.
945 for key
, val
in tar
.pax_headers
.iteritems():
946 self
.assertTrue(type(key
) is unicode)
947 self
.assertTrue(type(val
) is unicode)
948 if key
in tarfile
.PAX_NUMBER_FIELDS
:
950 tarfile
.PAX_NUMBER_FIELDS
[key
](val
)
951 except (TypeError, ValueError):
952 self
.fail("unable to convert pax header field")
954 def test_pax_extended_header(self
):
955 # The fields from the pax header have priority over the
957 pax_headers
= {u
"path": u
"foo", u
"uid": u
"123"}
959 tar
= tarfile
.open(tmpname
, "w", format
=tarfile
.PAX_FORMAT
, encoding
="iso8859-1")
960 t
= tarfile
.TarInfo()
961 t
.name
= u
"äöü" # non-ASCII
962 t
.uid
= 8**8 # too large
963 t
.pax_headers
= pax_headers
967 tar
= tarfile
.open(tmpname
, encoding
="iso8859-1")
968 t
= tar
.getmembers()[0]
969 self
.assertEqual(t
.pax_headers
, pax_headers
)
970 self
.assertEqual(t
.name
, "foo")
971 self
.assertEqual(t
.uid
, 123)
974 class UstarUnicodeTest(unittest
.TestCase
):
975 # All *UnicodeTests FIXME
977 format
= tarfile
.USTAR_FORMAT
979 def test_iso8859_1_filename(self
):
980 self
._test
_unicode
_filename
("iso8859-1")
982 def test_utf7_filename(self
):
983 self
._test
_unicode
_filename
("utf7")
985 def test_utf8_filename(self
):
986 self
._test
_unicode
_filename
("utf8")
988 def _test_unicode_filename(self
, encoding
):
989 tar
= tarfile
.open(tmpname
, "w", format
=self
.format
, encoding
=encoding
, errors
="strict")
991 tar
.addfile(tarfile
.TarInfo(name
))
994 tar
= tarfile
.open(tmpname
, encoding
=encoding
)
995 self
.assertTrue(type(tar
.getnames()[0]) is not unicode)
996 self
.assertEqual(tar
.getmembers()[0].name
, name
.encode(encoding
))
999 def test_unicode_filename_error(self
):
1000 tar
= tarfile
.open(tmpname
, "w", format
=self
.format
, encoding
="ascii", errors
="strict")
1001 tarinfo
= tarfile
.TarInfo()
1003 tarinfo
.name
= "äöü"
1004 if self
.format
== tarfile
.PAX_FORMAT
:
1005 self
.assertRaises(UnicodeError, tar
.addfile
, tarinfo
)
1007 tar
.addfile(tarinfo
)
1009 tarinfo
.name
= u
"äöü"
1010 self
.assertRaises(UnicodeError, tar
.addfile
, tarinfo
)
1012 tarinfo
.name
= "foo"
1013 tarinfo
.uname
= u
"äöü"
1014 self
.assertRaises(UnicodeError, tar
.addfile
, tarinfo
)
1016 def test_unicode_argument(self
):
1017 tar
= tarfile
.open(tarname
, "r", encoding
="iso8859-1", errors
="strict")
1019 self
.assertTrue(type(t
.name
) is str)
1020 self
.assertTrue(type(t
.linkname
) is str)
1021 self
.assertTrue(type(t
.uname
) is str)
1022 self
.assertTrue(type(t
.gname
) is str)
1025 def test_uname_unicode(self
):
1026 for name
in (u
"äöü", "äöü"):
1027 t
= tarfile
.TarInfo("foo")
1031 fobj
= StringIO
.StringIO()
1032 tar
= tarfile
.open("foo.tar", mode
="w", fileobj
=fobj
, format
=self
.format
, encoding
="iso8859-1")
1037 tar
= tarfile
.open("foo.tar", fileobj
=fobj
, encoding
="iso8859-1")
1038 t
= tar
.getmember("foo")
1039 self
.assertEqual(t
.uname
, "äöü")
1040 self
.assertEqual(t
.gname
, "äöü")
1043 class GNUUnicodeTest(UstarUnicodeTest
):
1045 format
= tarfile
.GNU_FORMAT
1048 class PaxUnicodeTest(UstarUnicodeTest
):
1050 format
= tarfile
.PAX_FORMAT
1052 def _create_unicode_name(self
, name
):
1053 tar
= tarfile
.open(tmpname
, "w", format
=self
.format
)
1054 t
= tarfile
.TarInfo()
1055 t
.pax_headers
["path"] = name
1059 def test_error_handlers(self
):
1060 # Test if the unicode error handlers work correctly for characters
1061 # that cannot be expressed in a given encoding.
1062 self
._create
_unicode
_name
(u
"äöü")
1064 for handler
, name
in (("utf-8", u
"äöü".encode("utf8")),
1065 ("replace", "???"), ("ignore", "")):
1066 tar
= tarfile
.open(tmpname
, format
=self
.format
, encoding
="ascii",
1068 self
.assertEqual(tar
.getnames()[0], name
)
1070 self
.assertRaises(UnicodeError, tarfile
.open, tmpname
,
1071 encoding
="ascii", errors
="strict")
1073 def test_error_handler_utf8(self
):
1074 # Create a pathname that has one component representable using
1075 # iso8859-1 and the other only in iso8859-15.
1076 self
._create
_unicode
_name
(u
"äöü/¤")
1078 tar
= tarfile
.open(tmpname
, format
=self
.format
, encoding
="iso8859-1",
1080 self
.assertEqual(tar
.getnames()[0], "äöü/" + u
"¤".encode("utf8"))
1083 class AppendTest(unittest
.TestCase
):
1084 # Test append mode (cp. patch #1652681).
1087 self
.tarname
= tmpname
1088 if os
.path
.exists(self
.tarname
):
1089 os
.remove(self
.tarname
)
1091 def _add_testfile(self
, fileobj
=None):
1092 tar
= tarfile
.open(self
.tarname
, "a", fileobj
=fileobj
)
1093 tar
.addfile(tarfile
.TarInfo("bar"))
1096 def _create_testtar(self
, mode
="w:"):
1097 src
= tarfile
.open(tarname
, encoding
="iso8859-1")
1098 t
= src
.getmember("ustar/regtype")
1100 f
= src
.extractfile(t
)
1101 tar
= tarfile
.open(self
.tarname
, mode
)
1105 def _test(self
, names
=["bar"], fileobj
=None):
1106 tar
= tarfile
.open(self
.tarname
, fileobj
=fileobj
)
1107 self
.assertEqual(tar
.getnames(), names
)
1109 def test_non_existing(self
):
1110 self
._add
_testfile
()
1113 def test_empty(self
):
1114 open(self
.tarname
, "w").close()
1115 self
._add
_testfile
()
1118 def test_empty_fileobj(self
):
1119 fobj
= StringIO
.StringIO()
1120 self
._add
_testfile
(fobj
)
1122 self
._test
(fileobj
=fobj
)
1124 def test_fileobj(self
):
1125 self
._create
_testtar
()
1126 data
= open(self
.tarname
).read()
1127 fobj
= StringIO
.StringIO(data
)
1128 self
._add
_testfile
(fobj
)
1130 self
._test
(names
=["foo", "bar"], fileobj
=fobj
)
1132 def test_existing(self
):
1133 self
._create
_testtar
()
1134 self
._add
_testfile
()
1135 self
._test
(names
=["foo", "bar"])
1137 def test_append_gz(self
):
1140 self
._create
_testtar
("w:gz")
1141 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tmpname
, "a")
1143 def test_append_bz2(self
):
1146 self
._create
_testtar
("w:bz2")
1147 self
.assertRaises(tarfile
.ReadError
, tarfile
.open, tmpname
, "a")
1150 class LimitsTest(unittest
.TestCase
):
1152 def test_ustar_limits(self
):
1154 tarinfo
= tarfile
.TarInfo("0123456789" * 10)
1155 tarinfo
.tobuf(tarfile
.USTAR_FORMAT
)
1157 # 101 char name that cannot be stored
1158 tarinfo
= tarfile
.TarInfo("0123456789" * 10 + "0")
1159 self
.assertRaises(ValueError, tarinfo
.tobuf
, tarfile
.USTAR_FORMAT
)
1161 # 256 char name with a slash at pos 156
1162 tarinfo
= tarfile
.TarInfo("123/" * 62 + "longname")
1163 tarinfo
.tobuf(tarfile
.USTAR_FORMAT
)
1165 # 256 char name that cannot be stored
1166 tarinfo
= tarfile
.TarInfo("1234567/" * 31 + "longname")
1167 self
.assertRaises(ValueError, tarinfo
.tobuf
, tarfile
.USTAR_FORMAT
)
1170 tarinfo
= tarfile
.TarInfo("123/" * 126 + "longname")
1171 self
.assertRaises(ValueError, tarinfo
.tobuf
, tarfile
.USTAR_FORMAT
)
1174 tarinfo
= tarfile
.TarInfo("longlink")
1175 tarinfo
.linkname
= "123/" * 126 + "longname"
1176 self
.assertRaises(ValueError, tarinfo
.tobuf
, tarfile
.USTAR_FORMAT
)
1179 tarinfo
= tarfile
.TarInfo("name")
1180 tarinfo
.uid
= 010000000
1181 self
.assertRaises(ValueError, tarinfo
.tobuf
, tarfile
.USTAR_FORMAT
)
1183 def test_gnu_limits(self
):
1184 tarinfo
= tarfile
.TarInfo("123/" * 126 + "longname")
1185 tarinfo
.tobuf(tarfile
.GNU_FORMAT
)
1187 tarinfo
= tarfile
.TarInfo("longlink")
1188 tarinfo
.linkname
= "123/" * 126 + "longname"
1189 tarinfo
.tobuf(tarfile
.GNU_FORMAT
)
1192 tarinfo
= tarfile
.TarInfo("name")
1193 tarinfo
.uid
= 04000000000000000000L
1194 self
.assertRaises(ValueError, tarinfo
.tobuf
, tarfile
.GNU_FORMAT
)
1196 def test_pax_limits(self
):
1197 tarinfo
= tarfile
.TarInfo("123/" * 126 + "longname")
1198 tarinfo
.tobuf(tarfile
.PAX_FORMAT
)
1200 tarinfo
= tarfile
.TarInfo("longlink")
1201 tarinfo
.linkname
= "123/" * 126 + "longname"
1202 tarinfo
.tobuf(tarfile
.PAX_FORMAT
)
1204 tarinfo
= tarfile
.TarInfo("name")
1205 tarinfo
.uid
= 04000000000000000000L
1206 tarinfo
.tobuf(tarfile
.PAX_FORMAT
)
1209 class GzipMiscReadTest(MiscReadTest
):
1212 class GzipUstarReadTest(UstarReadTest
):
1215 class GzipStreamReadTest(StreamReadTest
):
1218 class GzipWriteTest(WriteTest
):
1220 class GzipStreamWriteTest(StreamWriteTest
):
1224 class Bz2MiscReadTest(MiscReadTest
):
1227 class Bz2UstarReadTest(UstarReadTest
):
1230 class Bz2StreamReadTest(StreamReadTest
):
1233 class Bz2WriteTest(WriteTest
):
1235 class Bz2StreamWriteTest(StreamWriteTest
):
1238 class Bz2PartialReadTest(unittest
.TestCase
):
1239 # Issue5068: The _BZ2Proxy.read() method loops forever
1240 # on an empty or partial bzipped file.
1242 def _test_partial_input(self
, mode
):
1243 class MyStringIO(StringIO
.StringIO
):
1247 raise AssertionError("infinite loop detected in tarfile.open()")
1248 self
.hit_eof
= self
.pos
== self
.len
1249 return StringIO
.StringIO
.read(self
, n
)
1251 data
= bz2
.compress(tarfile
.TarInfo("foo").tobuf())
1252 for x
in range(len(data
) + 1):
1253 tarfile
.open(fileobj
=MyStringIO(data
[:x
]), mode
=mode
)
1255 def test_partial_input(self
):
1256 self
._test
_partial
_input
("r")
1258 def test_partial_input_bz2(self
):
1259 self
._test
_partial
_input
("r:bz2")
1263 os
.makedirs(TEMPDIR
)
1284 if hasattr(os
, "link"):
1285 tests
.append(HardlinkTest
)
1287 fobj
= open(tarname
, "rb")
1292 # Create testtar.tar.gz and add gzip-specific tests.
1293 tar
= gzip
.open(gzipname
, "wb")
1302 GzipStreamWriteTest
,
1306 # Create testtar.tar.bz2 and add bz2-specific tests.
1307 tar
= bz2
.BZ2File(bz2name
, "wb")
1321 test_support
.run_unittest(*tests
)
1323 if os
.path
.exists(TEMPDIR
):
1324 shutil
.rmtree(TEMPDIR
)
1326 if __name__
== "__main__":