move sections
[python/dscho.git] / Lib / test / test_shutil.py
blobd6839359bd712d0105223a632a682f8072368c78
1 # Copyright (C) 2003 Python Software Foundation
3 import unittest
4 import shutil
5 import tempfile
6 import sys
7 import stat
8 import os
9 import os.path
10 from os.path import splitdrive
11 from distutils.spawn import find_executable, spawn
12 from shutil import (_make_tarball, _make_zipfile, make_archive,
13 register_archive_format, unregister_archive_format,
14 get_archive_formats)
15 import tarfile
16 import warnings
18 from test import test_support
19 from test.test_support import TESTFN, check_warnings, captured_stdout
21 TESTFN2 = TESTFN + "2"
23 try:
24 import grp
25 import pwd
26 UID_GID_SUPPORT = True
27 except ImportError:
28 UID_GID_SUPPORT = False
30 try:
31 import zlib
32 except ImportError:
33 zlib = None
35 try:
36 import zipfile
37 ZIP_SUPPORT = True
38 except ImportError:
39 ZIP_SUPPORT = find_executable('zip')
41 class TestShutil(unittest.TestCase):
43 def setUp(self):
44 super(TestShutil, self).setUp()
45 self.tempdirs = []
47 def tearDown(self):
48 super(TestShutil, self).tearDown()
49 while self.tempdirs:
50 d = self.tempdirs.pop()
51 shutil.rmtree(d, os.name in ('nt', 'cygwin'))
53 def write_file(self, path, content='xxx'):
54 """Writes a file in the given path.
57 path can be a string or a sequence.
58 """
59 if isinstance(path, (list, tuple)):
60 path = os.path.join(*path)
61 f = open(path, 'w')
62 try:
63 f.write(content)
64 finally:
65 f.close()
67 def mkdtemp(self):
68 """Create a temporary directory that will be cleaned up.
70 Returns the path of the directory.
71 """
72 d = tempfile.mkdtemp()
73 self.tempdirs.append(d)
74 return d
75 def test_rmtree_errors(self):
76 # filename is guaranteed not to exist
77 filename = tempfile.mktemp()
78 self.assertRaises(OSError, shutil.rmtree, filename)
80 # See bug #1071513 for why we don't run this on cygwin
81 # and bug #1076467 for why we don't run this as root.
82 if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
83 and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
84 def test_on_error(self):
85 self.errorState = 0
86 os.mkdir(TESTFN)
87 self.childpath = os.path.join(TESTFN, 'a')
88 f = open(self.childpath, 'w')
89 f.close()
90 old_dir_mode = os.stat(TESTFN).st_mode
91 old_child_mode = os.stat(self.childpath).st_mode
92 # Make unwritable.
93 os.chmod(self.childpath, stat.S_IREAD)
94 os.chmod(TESTFN, stat.S_IREAD)
96 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
97 # Test whether onerror has actually been called.
98 self.assertEqual(self.errorState, 2,
99 "Expected call to onerror function did not happen.")
101 # Make writable again.
102 os.chmod(TESTFN, old_dir_mode)
103 os.chmod(self.childpath, old_child_mode)
105 # Clean up.
106 shutil.rmtree(TESTFN)
108 def check_args_to_onerror(self, func, arg, exc):
109 # test_rmtree_errors deliberately runs rmtree
110 # on a directory that is chmod 400, which will fail.
111 # This function is run when shutil.rmtree fails.
112 # 99.9% of the time it initially fails to remove
113 # a file in the directory, so the first time through
114 # func is os.remove.
115 # However, some Linux machines running ZFS on
116 # FUSE experienced a failure earlier in the process
117 # at os.listdir. The first failure may legally
118 # be either.
119 if self.errorState == 0:
120 if func is os.remove:
121 self.assertEqual(arg, self.childpath)
122 else:
123 self.assertIs(func, os.listdir,
124 "func must be either os.remove or os.listdir")
125 self.assertEqual(arg, TESTFN)
126 self.assertTrue(issubclass(exc[0], OSError))
127 self.errorState = 1
128 else:
129 self.assertEqual(func, os.rmdir)
130 self.assertEqual(arg, TESTFN)
131 self.assertTrue(issubclass(exc[0], OSError))
132 self.errorState = 2
134 def test_rmtree_dont_delete_file(self):
135 # When called on a file instead of a directory, don't delete it.
136 handle, path = tempfile.mkstemp()
137 os.fdopen(handle).close()
138 self.assertRaises(OSError, shutil.rmtree, path)
139 os.remove(path)
141 def test_copytree_simple(self):
142 def write_data(path, data):
143 f = open(path, "w")
144 f.write(data)
145 f.close()
147 def read_data(path):
148 f = open(path)
149 data = f.read()
150 f.close()
151 return data
153 src_dir = tempfile.mkdtemp()
154 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
156 write_data(os.path.join(src_dir, 'test.txt'), '123')
158 os.mkdir(os.path.join(src_dir, 'test_dir'))
159 write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
161 try:
162 shutil.copytree(src_dir, dst_dir)
163 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
164 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
165 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
166 'test.txt')))
167 actual = read_data(os.path.join(dst_dir, 'test.txt'))
168 self.assertEqual(actual, '123')
169 actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
170 self.assertEqual(actual, '456')
171 finally:
172 for path in (
173 os.path.join(src_dir, 'test.txt'),
174 os.path.join(dst_dir, 'test.txt'),
175 os.path.join(src_dir, 'test_dir', 'test.txt'),
176 os.path.join(dst_dir, 'test_dir', 'test.txt'),
178 if os.path.exists(path):
179 os.remove(path)
180 for path in (src_dir,
181 os.path.dirname(dst_dir)
183 if os.path.exists(path):
184 shutil.rmtree(path)
186 def test_copytree_with_exclude(self):
188 def write_data(path, data):
189 f = open(path, "w")
190 f.write(data)
191 f.close()
193 def read_data(path):
194 f = open(path)
195 data = f.read()
196 f.close()
197 return data
199 # creating data
200 join = os.path.join
201 exists = os.path.exists
202 src_dir = tempfile.mkdtemp()
203 try:
204 dst_dir = join(tempfile.mkdtemp(), 'destination')
205 write_data(join(src_dir, 'test.txt'), '123')
206 write_data(join(src_dir, 'test.tmp'), '123')
207 os.mkdir(join(src_dir, 'test_dir'))
208 write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
209 os.mkdir(join(src_dir, 'test_dir2'))
210 write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
211 os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
212 os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
213 write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
214 write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
217 # testing glob-like patterns
218 try:
219 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
220 shutil.copytree(src_dir, dst_dir, ignore=patterns)
221 # checking the result: some elements should not be copied
222 self.assertTrue(exists(join(dst_dir, 'test.txt')))
223 self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
224 self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
225 finally:
226 if os.path.exists(dst_dir):
227 shutil.rmtree(dst_dir)
228 try:
229 patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
230 shutil.copytree(src_dir, dst_dir, ignore=patterns)
231 # checking the result: some elements should not be copied
232 self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
233 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
234 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
235 finally:
236 if os.path.exists(dst_dir):
237 shutil.rmtree(dst_dir)
239 # testing callable-style
240 try:
241 def _filter(src, names):
242 res = []
243 for name in names:
244 path = os.path.join(src, name)
246 if (os.path.isdir(path) and
247 path.split()[-1] == 'subdir'):
248 res.append(name)
249 elif os.path.splitext(path)[-1] in ('.py'):
250 res.append(name)
251 return res
253 shutil.copytree(src_dir, dst_dir, ignore=_filter)
255 # checking the result: some elements should not be copied
256 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
257 'test.py')))
258 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
260 finally:
261 if os.path.exists(dst_dir):
262 shutil.rmtree(dst_dir)
263 finally:
264 shutil.rmtree(src_dir)
265 shutil.rmtree(os.path.dirname(dst_dir))
267 if hasattr(os, "symlink"):
268 def test_dont_copy_file_onto_link_to_itself(self):
269 # bug 851123.
270 os.mkdir(TESTFN)
271 src = os.path.join(TESTFN, 'cheese')
272 dst = os.path.join(TESTFN, 'shop')
273 try:
274 f = open(src, 'w')
275 f.write('cheddar')
276 f.close()
278 os.link(src, dst)
279 self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
280 self.assertEqual(open(src,'r').read(), 'cheddar')
281 os.remove(dst)
283 # Using `src` here would mean we end up with a symlink pointing
284 # to TESTFN/TESTFN/cheese, while it should point at
285 # TESTFN/cheese.
286 os.symlink('cheese', dst)
287 self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
288 self.assertEqual(open(src,'r').read(), 'cheddar')
289 os.remove(dst)
290 finally:
291 try:
292 shutil.rmtree(TESTFN)
293 except OSError:
294 pass
296 def test_rmtree_on_symlink(self):
297 # bug 1669.
298 os.mkdir(TESTFN)
299 try:
300 src = os.path.join(TESTFN, 'cheese')
301 dst = os.path.join(TESTFN, 'shop')
302 os.mkdir(src)
303 os.symlink(src, dst)
304 self.assertRaises(OSError, shutil.rmtree, dst)
305 finally:
306 shutil.rmtree(TESTFN, ignore_errors=True)
308 if hasattr(os, "mkfifo"):
309 # Issue #3002: copyfile and copytree block indefinitely on named pipes
310 def test_copyfile_named_pipe(self):
311 os.mkfifo(TESTFN)
312 try:
313 self.assertRaises(shutil.SpecialFileError,
314 shutil.copyfile, TESTFN, TESTFN2)
315 self.assertRaises(shutil.SpecialFileError,
316 shutil.copyfile, __file__, TESTFN)
317 finally:
318 os.remove(TESTFN)
320 def test_copytree_named_pipe(self):
321 os.mkdir(TESTFN)
322 try:
323 subdir = os.path.join(TESTFN, "subdir")
324 os.mkdir(subdir)
325 pipe = os.path.join(subdir, "mypipe")
326 os.mkfifo(pipe)
327 try:
328 shutil.copytree(TESTFN, TESTFN2)
329 except shutil.Error as e:
330 errors = e.args[0]
331 self.assertEqual(len(errors), 1)
332 src, dst, error_msg = errors[0]
333 self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
334 else:
335 self.fail("shutil.Error should have been raised")
336 finally:
337 shutil.rmtree(TESTFN, ignore_errors=True)
338 shutil.rmtree(TESTFN2, ignore_errors=True)
340 @unittest.skipUnless(zlib, "requires zlib")
341 def test_make_tarball(self):
342 # creating something to tar
343 tmpdir = self.mkdtemp()
344 self.write_file([tmpdir, 'file1'], 'xxx')
345 self.write_file([tmpdir, 'file2'], 'xxx')
346 os.mkdir(os.path.join(tmpdir, 'sub'))
347 self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
349 tmpdir2 = self.mkdtemp()
350 unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
351 "source and target should be on same drive")
353 base_name = os.path.join(tmpdir2, 'archive')
355 # working with relative paths to avoid tar warnings
356 old_dir = os.getcwd()
357 os.chdir(tmpdir)
358 try:
359 _make_tarball(splitdrive(base_name)[1], '.')
360 finally:
361 os.chdir(old_dir)
363 # check if the compressed tarball was created
364 tarball = base_name + '.tar.gz'
365 self.assertTrue(os.path.exists(tarball))
367 # trying an uncompressed one
368 base_name = os.path.join(tmpdir2, 'archive')
369 old_dir = os.getcwd()
370 os.chdir(tmpdir)
371 try:
372 _make_tarball(splitdrive(base_name)[1], '.', compress=None)
373 finally:
374 os.chdir(old_dir)
375 tarball = base_name + '.tar'
376 self.assertTrue(os.path.exists(tarball))
378 def _tarinfo(self, path):
379 tar = tarfile.open(path)
380 try:
381 names = tar.getnames()
382 names.sort()
383 return tuple(names)
384 finally:
385 tar.close()
387 def _create_files(self):
388 # creating something to tar
389 tmpdir = self.mkdtemp()
390 dist = os.path.join(tmpdir, 'dist')
391 os.mkdir(dist)
392 self.write_file([dist, 'file1'], 'xxx')
393 self.write_file([dist, 'file2'], 'xxx')
394 os.mkdir(os.path.join(dist, 'sub'))
395 self.write_file([dist, 'sub', 'file3'], 'xxx')
396 os.mkdir(os.path.join(dist, 'sub2'))
397 tmpdir2 = self.mkdtemp()
398 base_name = os.path.join(tmpdir2, 'archive')
399 return tmpdir, tmpdir2, base_name
401 @unittest.skipUnless(zlib, "Requires zlib")
402 @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
403 'Need the tar command to run')
404 def test_tarfile_vs_tar(self):
405 tmpdir, tmpdir2, base_name = self._create_files()
406 old_dir = os.getcwd()
407 os.chdir(tmpdir)
408 try:
409 _make_tarball(base_name, 'dist')
410 finally:
411 os.chdir(old_dir)
413 # check if the compressed tarball was created
414 tarball = base_name + '.tar.gz'
415 self.assertTrue(os.path.exists(tarball))
417 # now create another tarball using `tar`
418 tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
419 tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
420 gzip_cmd = ['gzip', '-f9', 'archive2.tar']
421 old_dir = os.getcwd()
422 os.chdir(tmpdir)
423 try:
424 with captured_stdout() as s:
425 spawn(tar_cmd)
426 spawn(gzip_cmd)
427 finally:
428 os.chdir(old_dir)
430 self.assertTrue(os.path.exists(tarball2))
431 # let's compare both tarballs
432 self.assertEquals(self._tarinfo(tarball), self._tarinfo(tarball2))
434 # trying an uncompressed one
435 base_name = os.path.join(tmpdir2, 'archive')
436 old_dir = os.getcwd()
437 os.chdir(tmpdir)
438 try:
439 _make_tarball(base_name, 'dist', compress=None)
440 finally:
441 os.chdir(old_dir)
442 tarball = base_name + '.tar'
443 self.assertTrue(os.path.exists(tarball))
445 # now for a dry_run
446 base_name = os.path.join(tmpdir2, 'archive')
447 old_dir = os.getcwd()
448 os.chdir(tmpdir)
449 try:
450 _make_tarball(base_name, 'dist', compress=None, dry_run=True)
451 finally:
452 os.chdir(old_dir)
453 tarball = base_name + '.tar'
454 self.assertTrue(os.path.exists(tarball))
456 @unittest.skipUnless(zlib, "Requires zlib")
457 @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
458 def test_make_zipfile(self):
459 # creating something to tar
460 tmpdir = self.mkdtemp()
461 self.write_file([tmpdir, 'file1'], 'xxx')
462 self.write_file([tmpdir, 'file2'], 'xxx')
464 tmpdir2 = self.mkdtemp()
465 base_name = os.path.join(tmpdir2, 'archive')
466 _make_zipfile(base_name, tmpdir)
468 # check if the compressed tarball was created
469 tarball = base_name + '.zip'
472 def test_make_archive(self):
473 tmpdir = self.mkdtemp()
474 base_name = os.path.join(tmpdir, 'archive')
475 self.assertRaises(ValueError, make_archive, base_name, 'xxx')
477 @unittest.skipUnless(zlib, "Requires zlib")
478 def test_make_archive_owner_group(self):
479 # testing make_archive with owner and group, with various combinations
480 # this works even if there's not gid/uid support
481 if UID_GID_SUPPORT:
482 group = grp.getgrgid(0)[0]
483 owner = pwd.getpwuid(0)[0]
484 else:
485 group = owner = 'root'
487 base_dir, root_dir, base_name = self._create_files()
488 base_name = os.path.join(self.mkdtemp() , 'archive')
489 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
490 group=group)
491 self.assertTrue(os.path.exists(res))
493 res = make_archive(base_name, 'zip', root_dir, base_dir)
494 self.assertTrue(os.path.exists(res))
496 res = make_archive(base_name, 'tar', root_dir, base_dir,
497 owner=owner, group=group)
498 self.assertTrue(os.path.exists(res))
500 res = make_archive(base_name, 'tar', root_dir, base_dir,
501 owner='kjhkjhkjg', group='oihohoh')
502 self.assertTrue(os.path.exists(res))
504 @unittest.skipUnless(zlib, "Requires zlib")
505 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
506 def test_tarfile_root_owner(self):
507 tmpdir, tmpdir2, base_name = self._create_files()
508 old_dir = os.getcwd()
509 os.chdir(tmpdir)
510 group = grp.getgrgid(0)[0]
511 owner = pwd.getpwuid(0)[0]
512 try:
513 archive_name = _make_tarball(base_name, 'dist', compress=None,
514 owner=owner, group=group)
515 finally:
516 os.chdir(old_dir)
518 # check if the compressed tarball was created
519 self.assertTrue(os.path.exists(archive_name))
521 # now checks the rights
522 archive = tarfile.open(archive_name)
523 try:
524 for member in archive.getmembers():
525 self.assertEquals(member.uid, 0)
526 self.assertEquals(member.gid, 0)
527 finally:
528 archive.close()
530 def test_make_archive_cwd(self):
531 current_dir = os.getcwd()
532 def _breaks(*args, **kw):
533 raise RuntimeError()
535 register_archive_format('xxx', _breaks, [], 'xxx file')
536 try:
537 try:
538 make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
539 except Exception:
540 pass
541 self.assertEquals(os.getcwd(), current_dir)
542 finally:
543 unregister_archive_format('xxx')
545 def test_register_archive_format(self):
547 self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
548 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
550 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
551 [(1, 2), (1, 2, 3)])
553 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
554 formats = [name for name, params in get_archive_formats()]
555 self.assertIn('xxx', formats)
557 unregister_archive_format('xxx')
558 formats = [name for name, params in get_archive_formats()]
559 self.assertNotIn('xxx', formats)
562 class TestMove(unittest.TestCase):
564 def setUp(self):
565 filename = "foo"
566 self.src_dir = tempfile.mkdtemp()
567 self.dst_dir = tempfile.mkdtemp()
568 self.src_file = os.path.join(self.src_dir, filename)
569 self.dst_file = os.path.join(self.dst_dir, filename)
570 # Try to create a dir in the current directory, hoping that it is
571 # not located on the same filesystem as the system tmp dir.
572 try:
573 self.dir_other_fs = tempfile.mkdtemp(
574 dir=os.path.dirname(__file__))
575 self.file_other_fs = os.path.join(self.dir_other_fs,
576 filename)
577 except OSError:
578 self.dir_other_fs = None
579 with open(self.src_file, "wb") as f:
580 f.write("spam")
582 def tearDown(self):
583 for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
584 try:
585 if d:
586 shutil.rmtree(d)
587 except:
588 pass
590 def _check_move_file(self, src, dst, real_dst):
591 contents = open(src, "rb").read()
592 shutil.move(src, dst)
593 self.assertEqual(contents, open(real_dst, "rb").read())
594 self.assertFalse(os.path.exists(src))
596 def _check_move_dir(self, src, dst, real_dst):
597 contents = sorted(os.listdir(src))
598 shutil.move(src, dst)
599 self.assertEqual(contents, sorted(os.listdir(real_dst)))
600 self.assertFalse(os.path.exists(src))
602 def test_move_file(self):
603 # Move a file to another location on the same filesystem.
604 self._check_move_file(self.src_file, self.dst_file, self.dst_file)
606 def test_move_file_to_dir(self):
607 # Move a file inside an existing dir on the same filesystem.
608 self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
610 def test_move_file_other_fs(self):
611 # Move a file to an existing dir on another filesystem.
612 if not self.dir_other_fs:
613 # skip
614 return
615 self._check_move_file(self.src_file, self.file_other_fs,
616 self.file_other_fs)
618 def test_move_file_to_dir_other_fs(self):
619 # Move a file to another location on another filesystem.
620 if not self.dir_other_fs:
621 # skip
622 return
623 self._check_move_file(self.src_file, self.dir_other_fs,
624 self.file_other_fs)
626 def test_move_dir(self):
627 # Move a dir to another location on the same filesystem.
628 dst_dir = tempfile.mktemp()
629 try:
630 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
631 finally:
632 try:
633 shutil.rmtree(dst_dir)
634 except:
635 pass
637 def test_move_dir_other_fs(self):
638 # Move a dir to another location on another filesystem.
639 if not self.dir_other_fs:
640 # skip
641 return
642 dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
643 try:
644 self._check_move_dir(self.src_dir, dst_dir, dst_dir)
645 finally:
646 try:
647 shutil.rmtree(dst_dir)
648 except:
649 pass
651 def test_move_dir_to_dir(self):
652 # Move a dir inside an existing dir on the same filesystem.
653 self._check_move_dir(self.src_dir, self.dst_dir,
654 os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
656 def test_move_dir_to_dir_other_fs(self):
657 # Move a dir inside an existing dir on another filesystem.
658 if not self.dir_other_fs:
659 # skip
660 return
661 self._check_move_dir(self.src_dir, self.dir_other_fs,
662 os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
664 def test_existing_file_inside_dest_dir(self):
665 # A file with the same name inside the destination dir already exists.
666 with open(self.dst_file, "wb"):
667 pass
668 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
670 def test_dont_move_dir_in_itself(self):
671 # Moving a dir inside itself raises an Error.
672 dst = os.path.join(self.src_dir, "bar")
673 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
675 def test_destinsrc_false_negative(self):
676 os.mkdir(TESTFN)
677 try:
678 for src, dst in [('srcdir', 'srcdir/dest')]:
679 src = os.path.join(TESTFN, src)
680 dst = os.path.join(TESTFN, dst)
681 self.assertTrue(shutil._destinsrc(src, dst),
682 msg='_destinsrc() wrongly concluded that '
683 'dst (%s) is not in src (%s)' % (dst, src))
684 finally:
685 shutil.rmtree(TESTFN, ignore_errors=True)
687 def test_destinsrc_false_positive(self):
688 os.mkdir(TESTFN)
689 try:
690 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
691 src = os.path.join(TESTFN, src)
692 dst = os.path.join(TESTFN, dst)
693 self.assertFalse(shutil._destinsrc(src, dst),
694 msg='_destinsrc() wrongly concluded that '
695 'dst (%s) is in src (%s)' % (dst, src))
696 finally:
697 shutil.rmtree(TESTFN, ignore_errors=True)
700 class TestCopyFile(unittest.TestCase):
702 _delete = False
704 class Faux(object):
705 _entered = False
706 _exited_with = None
707 _raised = False
708 def __init__(self, raise_in_exit=False, suppress_at_exit=True):
709 self._raise_in_exit = raise_in_exit
710 self._suppress_at_exit = suppress_at_exit
711 def read(self, *args):
712 return ''
713 def __enter__(self):
714 self._entered = True
715 def __exit__(self, exc_type, exc_val, exc_tb):
716 self._exited_with = exc_type, exc_val, exc_tb
717 if self._raise_in_exit:
718 self._raised = True
719 raise IOError("Cannot close")
720 return self._suppress_at_exit
722 def tearDown(self):
723 if self._delete:
724 del shutil.open
726 def _set_shutil_open(self, func):
727 shutil.open = func
728 self._delete = True
730 def test_w_source_open_fails(self):
731 def _open(filename, mode='r'):
732 if filename == 'srcfile':
733 raise IOError('Cannot open "srcfile"')
734 assert 0 # shouldn't reach here.
736 self._set_shutil_open(_open)
738 self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
740 def test_w_dest_open_fails(self):
742 srcfile = self.Faux()
744 def _open(filename, mode='r'):
745 if filename == 'srcfile':
746 return srcfile
747 if filename == 'destfile':
748 raise IOError('Cannot open "destfile"')
749 assert 0 # shouldn't reach here.
751 self._set_shutil_open(_open)
753 shutil.copyfile('srcfile', 'destfile')
754 self.assertTrue(srcfile._entered)
755 self.assertTrue(srcfile._exited_with[0] is IOError)
756 self.assertEqual(srcfile._exited_with[1].args,
757 ('Cannot open "destfile"',))
759 def test_w_dest_close_fails(self):
761 srcfile = self.Faux()
762 destfile = self.Faux(True)
764 def _open(filename, mode='r'):
765 if filename == 'srcfile':
766 return srcfile
767 if filename == 'destfile':
768 return destfile
769 assert 0 # shouldn't reach here.
771 self._set_shutil_open(_open)
773 shutil.copyfile('srcfile', 'destfile')
774 self.assertTrue(srcfile._entered)
775 self.assertTrue(destfile._entered)
776 self.assertTrue(destfile._raised)
777 self.assertTrue(srcfile._exited_with[0] is IOError)
778 self.assertEqual(srcfile._exited_with[1].args,
779 ('Cannot close',))
781 def test_w_source_close_fails(self):
783 srcfile = self.Faux(True)
784 destfile = self.Faux()
786 def _open(filename, mode='r'):
787 if filename == 'srcfile':
788 return srcfile
789 if filename == 'destfile':
790 return destfile
791 assert 0 # shouldn't reach here.
793 self._set_shutil_open(_open)
795 self.assertRaises(IOError,
796 shutil.copyfile, 'srcfile', 'destfile')
797 self.assertTrue(srcfile._entered)
798 self.assertTrue(destfile._entered)
799 self.assertFalse(destfile._raised)
800 self.assertTrue(srcfile._exited_with[0] is None)
801 self.assertTrue(srcfile._raised)
804 def test_main():
805 test_support.run_unittest(TestShutil, TestMove, TestCopyFile)
807 if __name__ == '__main__':
808 test_main()