add release date
[python/dscho.git] / Lib / shutil.py
blob1d86df36c26b129b7beae2d52ac276e6b30f7341
1 """Utility functions for copying and archiving files and directory trees.
3 XXX The functions here don't copy the resource fork or other metadata on Mac.
5 """
7 import os
8 import sys
9 import stat
10 from os.path import abspath
11 import fnmatch
12 import collections
13 import errno
15 try:
16 from pwd import getpwnam
17 except ImportError:
18 getpwnam = None
20 try:
21 from grp import getgrnam
22 except ImportError:
23 getgrnam = None
25 __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
26 "copytree", "move", "rmtree", "Error", "SpecialFileError",
27 "ExecError", "make_archive", "get_archive_formats",
28 "register_archive_format", "unregister_archive_format"]
30 class Error(EnvironmentError):
31 pass
33 class SpecialFileError(EnvironmentError):
34 """Raised when trying to do a kind of operation (e.g. copying) which is
35 not supported on a special file (e.g. a named pipe)"""
37 class ExecError(EnvironmentError):
38 """Raised when a command could not be executed"""
40 try:
41 WindowsError
42 except NameError:
43 WindowsError = None
45 def copyfileobj(fsrc, fdst, length=16*1024):
46 """copy data from file-like object fsrc to file-like object fdst"""
47 while 1:
48 buf = fsrc.read(length)
49 if not buf:
50 break
51 fdst.write(buf)
53 def _samefile(src, dst):
54 # Macintosh, Unix.
55 if hasattr(os.path, 'samefile'):
56 try:
57 return os.path.samefile(src, dst)
58 except OSError:
59 return False
61 # All other platforms: check for same pathname.
62 return (os.path.normcase(os.path.abspath(src)) ==
63 os.path.normcase(os.path.abspath(dst)))
65 def copyfile(src, dst):
66 """Copy data from src to dst"""
67 if _samefile(src, dst):
68 raise Error("`%s` and `%s` are the same file" % (src, dst))
70 for fn in [src, dst]:
71 try:
72 st = os.stat(fn)
73 except OSError:
74 # File most likely does not exist
75 pass
76 else:
77 # XXX What about other special files? (sockets, devices...)
78 if stat.S_ISFIFO(st.st_mode):
79 raise SpecialFileError("`%s` is a named pipe" % fn)
81 with open(src, 'rb') as fsrc:
82 with open(dst, 'wb') as fdst:
83 copyfileobj(fsrc, fdst)
85 def copymode(src, dst):
86 """Copy mode bits from src to dst"""
87 if hasattr(os, 'chmod'):
88 st = os.stat(src)
89 mode = stat.S_IMODE(st.st_mode)
90 os.chmod(dst, mode)
92 def copystat(src, dst):
93 """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
94 st = os.stat(src)
95 mode = stat.S_IMODE(st.st_mode)
96 if hasattr(os, 'utime'):
97 os.utime(dst, (st.st_atime, st.st_mtime))
98 if hasattr(os, 'chmod'):
99 os.chmod(dst, mode)
100 if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
101 try:
102 os.chflags(dst, st.st_flags)
103 except OSError, why:
104 if (not hasattr(errno, 'EOPNOTSUPP') or
105 why.errno != errno.EOPNOTSUPP):
106 raise
108 def copy(src, dst):
109 """Copy data and mode bits ("cp src dst").
111 The destination may be a directory.
114 if os.path.isdir(dst):
115 dst = os.path.join(dst, os.path.basename(src))
116 copyfile(src, dst)
117 copymode(src, dst)
119 def copy2(src, dst):
120 """Copy data and all stat info ("cp -p src dst").
122 The destination may be a directory.
125 if os.path.isdir(dst):
126 dst = os.path.join(dst, os.path.basename(src))
127 copyfile(src, dst)
128 copystat(src, dst)
130 def ignore_patterns(*patterns):
131 """Function that can be used as copytree() ignore parameter.
133 Patterns is a sequence of glob-style patterns
134 that are used to exclude files"""
135 def _ignore_patterns(path, names):
136 ignored_names = []
137 for pattern in patterns:
138 ignored_names.extend(fnmatch.filter(names, pattern))
139 return set(ignored_names)
140 return _ignore_patterns
142 def copytree(src, dst, symlinks=False, ignore=None):
143 """Recursively copy a directory tree using copy2().
145 The destination directory must not already exist.
146 If exception(s) occur, an Error is raised with a list of reasons.
148 If the optional symlinks flag is true, symbolic links in the
149 source tree result in symbolic links in the destination tree; if
150 it is false, the contents of the files pointed to by symbolic
151 links are copied.
153 The optional ignore argument is a callable. If given, it
154 is called with the `src` parameter, which is the directory
155 being visited by copytree(), and `names` which is the list of
156 `src` contents, as returned by os.listdir():
158 callable(src, names) -> ignored_names
160 Since copytree() is called recursively, the callable will be
161 called once for each directory that is copied. It returns a
162 list of names relative to the `src` directory that should
163 not be copied.
165 XXX Consider this example code rather than the ultimate tool.
168 names = os.listdir(src)
169 if ignore is not None:
170 ignored_names = ignore(src, names)
171 else:
172 ignored_names = set()
174 os.makedirs(dst)
175 errors = []
176 for name in names:
177 if name in ignored_names:
178 continue
179 srcname = os.path.join(src, name)
180 dstname = os.path.join(dst, name)
181 try:
182 if symlinks and os.path.islink(srcname):
183 linkto = os.readlink(srcname)
184 os.symlink(linkto, dstname)
185 elif os.path.isdir(srcname):
186 copytree(srcname, dstname, symlinks, ignore)
187 else:
188 # Will raise a SpecialFileError for unsupported file types
189 copy2(srcname, dstname)
190 # catch the Error from the recursive copytree so that we can
191 # continue with other files
192 except Error, err:
193 errors.extend(err.args[0])
194 except EnvironmentError, why:
195 errors.append((srcname, dstname, str(why)))
196 try:
197 copystat(src, dst)
198 except OSError, why:
199 if WindowsError is not None and isinstance(why, WindowsError):
200 # Copying file access times may fail on Windows
201 pass
202 else:
203 errors.extend((src, dst, str(why)))
204 if errors:
205 raise Error, errors
207 def rmtree(path, ignore_errors=False, onerror=None):
208 """Recursively delete a directory tree.
210 If ignore_errors is set, errors are ignored; otherwise, if onerror
211 is set, it is called to handle the error with arguments (func,
212 path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
213 path is the argument to that function that caused it to fail; and
214 exc_info is a tuple returned by sys.exc_info(). If ignore_errors
215 is false and onerror is None, an exception is raised.
218 if ignore_errors:
219 def onerror(*args):
220 pass
221 elif onerror is None:
222 def onerror(*args):
223 raise
224 try:
225 if os.path.islink(path):
226 # symlinks to directories are forbidden, see bug #1669
227 raise OSError("Cannot call rmtree on a symbolic link")
228 except OSError:
229 onerror(os.path.islink, path, sys.exc_info())
230 # can't continue even if onerror hook returns
231 return
232 names = []
233 try:
234 names = os.listdir(path)
235 except os.error, err:
236 onerror(os.listdir, path, sys.exc_info())
237 for name in names:
238 fullname = os.path.join(path, name)
239 try:
240 mode = os.lstat(fullname).st_mode
241 except os.error:
242 mode = 0
243 if stat.S_ISDIR(mode):
244 rmtree(fullname, ignore_errors, onerror)
245 else:
246 try:
247 os.remove(fullname)
248 except os.error, err:
249 onerror(os.remove, fullname, sys.exc_info())
250 try:
251 os.rmdir(path)
252 except os.error:
253 onerror(os.rmdir, path, sys.exc_info())
256 def _basename(path):
257 # A basename() variant which first strips the trailing slash, if present.
258 # Thus we always get the last component of the path, even for directories.
259 return os.path.basename(path.rstrip(os.path.sep))
261 def move(src, dst):
262 """Recursively move a file or directory to another location. This is
263 similar to the Unix "mv" command.
265 If the destination is a directory or a symlink to a directory, the source
266 is moved inside the directory. The destination path must not already
267 exist.
269 If the destination already exists but is not a directory, it may be
270 overwritten depending on os.rename() semantics.
272 If the destination is on our current filesystem, then rename() is used.
273 Otherwise, src is copied to the destination and then removed.
274 A lot more could be done here... A look at a mv.c shows a lot of
275 the issues this implementation glosses over.
278 real_dst = dst
279 if os.path.isdir(dst):
280 real_dst = os.path.join(dst, _basename(src))
281 if os.path.exists(real_dst):
282 raise Error, "Destination path '%s' already exists" % real_dst
283 try:
284 os.rename(src, real_dst)
285 except OSError:
286 if os.path.isdir(src):
287 if _destinsrc(src, dst):
288 raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst)
289 copytree(src, real_dst, symlinks=True)
290 rmtree(src)
291 else:
292 copy2(src, real_dst)
293 os.unlink(src)
295 def _destinsrc(src, dst):
296 src = abspath(src)
297 dst = abspath(dst)
298 if not src.endswith(os.path.sep):
299 src += os.path.sep
300 if not dst.endswith(os.path.sep):
301 dst += os.path.sep
302 return dst.startswith(src)
304 def _get_gid(name):
305 """Returns a gid, given a group name."""
306 if getgrnam is None or name is None:
307 return None
308 try:
309 result = getgrnam(name)
310 except KeyError:
311 result = None
312 if result is not None:
313 return result[2]
314 return None
316 def _get_uid(name):
317 """Returns an uid, given a user name."""
318 if getpwnam is None or name is None:
319 return None
320 try:
321 result = getpwnam(name)
322 except KeyError:
323 result = None
324 if result is not None:
325 return result[2]
326 return None
328 def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
329 owner=None, group=None, logger=None):
330 """Create a (possibly compressed) tar file from all the files under
331 'base_dir'.
333 'compress' must be "gzip" (the default), "bzip2", or None.
335 'owner' and 'group' can be used to define an owner and a group for the
336 archive that is being built. If not provided, the current owner and group
337 will be used.
339 The output tar file will be named 'base_dir' + ".tar", possibly plus
340 the appropriate compression extension (".gz", or ".bz2").
342 Returns the output filename.
344 tar_compression = {'gzip': 'gz', 'bzip2': 'bz2', None: ''}
345 compress_ext = {'gzip': '.gz', 'bzip2': '.bz2'}
347 # flags for compression program, each element of list will be an argument
348 if compress is not None and compress not in compress_ext.keys():
349 raise ValueError, \
350 ("bad value for 'compress': must be None, 'gzip' or 'bzip2'")
352 archive_name = base_name + '.tar' + compress_ext.get(compress, '')
353 archive_dir = os.path.dirname(archive_name)
355 if not os.path.exists(archive_dir):
356 logger.info("creating %s" % archive_dir)
357 if not dry_run:
358 os.makedirs(archive_dir)
361 # creating the tarball
362 import tarfile # late import so Python build itself doesn't break
364 if logger is not None:
365 logger.info('Creating tar archive')
367 uid = _get_uid(owner)
368 gid = _get_gid(group)
370 def _set_uid_gid(tarinfo):
371 if gid is not None:
372 tarinfo.gid = gid
373 tarinfo.gname = group
374 if uid is not None:
375 tarinfo.uid = uid
376 tarinfo.uname = owner
377 return tarinfo
379 if not dry_run:
380 tar = tarfile.open(archive_name, 'w|%s' % tar_compression[compress])
381 try:
382 tar.add(base_dir, filter=_set_uid_gid)
383 finally:
384 tar.close()
386 return archive_name
388 def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False):
389 # XXX see if we want to keep an external call here
390 if verbose:
391 zipoptions = "-r"
392 else:
393 zipoptions = "-rq"
394 from distutils.errors import DistutilsExecError
395 from distutils.spawn import spawn
396 try:
397 spawn(["zip", zipoptions, zip_filename, base_dir], dry_run=dry_run)
398 except DistutilsExecError:
399 # XXX really should distinguish between "couldn't find
400 # external 'zip' command" and "zip failed".
401 raise ExecError, \
402 ("unable to create zip file '%s': "
403 "could neither import the 'zipfile' module nor "
404 "find a standalone zip utility") % zip_filename
406 def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
407 """Create a zip file from all the files under 'base_dir'.
409 The output zip file will be named 'base_dir' + ".zip". Uses either the
410 "zipfile" Python module (if available) or the InfoZIP "zip" utility
411 (if installed and found on the default search path). If neither tool is
412 available, raises ExecError. Returns the name of the output zip
413 file.
415 zip_filename = base_name + ".zip"
416 archive_dir = os.path.dirname(base_name)
418 if not os.path.exists(archive_dir):
419 if logger is not None:
420 logger.info("creating %s", archive_dir)
421 if not dry_run:
422 os.makedirs(archive_dir)
424 # If zipfile module is not available, try spawning an external 'zip'
425 # command.
426 try:
427 import zipfile
428 except ImportError:
429 zipfile = None
431 if zipfile is None:
432 _call_external_zip(base_dir, zip_filename, verbose, dry_run)
433 else:
434 if logger is not None:
435 logger.info("creating '%s' and adding '%s' to it",
436 zip_filename, base_dir)
438 if not dry_run:
439 zip = zipfile.ZipFile(zip_filename, "w",
440 compression=zipfile.ZIP_DEFLATED)
442 for dirpath, dirnames, filenames in os.walk(base_dir):
443 for name in filenames:
444 path = os.path.normpath(os.path.join(dirpath, name))
445 if os.path.isfile(path):
446 zip.write(path, path)
447 if logger is not None:
448 logger.info("adding '%s'", path)
449 zip.close()
451 return zip_filename
453 _ARCHIVE_FORMATS = {
454 'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"),
455 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"),
456 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
457 'zip': (_make_zipfile, [],"ZIP file")
460 def get_archive_formats():
461 """Returns a list of supported formats for archiving and unarchiving.
463 Each element of the returned sequence is a tuple (name, description)
465 formats = [(name, registry[2]) for name, registry in
466 _ARCHIVE_FORMATS.items()]
467 formats.sort()
468 return formats
470 def register_archive_format(name, function, extra_args=None, description=''):
471 """Registers an archive format.
473 name is the name of the format. function is the callable that will be
474 used to create archives. If provided, extra_args is a sequence of
475 (name, value) tuples that will be passed as arguments to the callable.
476 description can be provided to describe the format, and will be returned
477 by the get_archive_formats() function.
479 if extra_args is None:
480 extra_args = []
481 if not isinstance(function, collections.Callable):
482 raise TypeError('The %s object is not callable' % function)
483 if not isinstance(extra_args, (tuple, list)):
484 raise TypeError('extra_args needs to be a sequence')
485 for element in extra_args:
486 if not isinstance(element, (tuple, list)) or len(element) !=2 :
487 raise TypeError('extra_args elements are : (arg_name, value)')
489 _ARCHIVE_FORMATS[name] = (function, extra_args, description)
491 def unregister_archive_format(name):
492 del _ARCHIVE_FORMATS[name]
494 def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
495 dry_run=0, owner=None, group=None, logger=None):
496 """Create an archive file (eg. zip or tar).
498 'base_name' is the name of the file to create, minus any format-specific
499 extension; 'format' is the archive format: one of "zip", "tar", "bztar"
500 or "gztar".
502 'root_dir' is a directory that will be the root directory of the
503 archive; ie. we typically chdir into 'root_dir' before creating the
504 archive. 'base_dir' is the directory where we start archiving from;
505 ie. 'base_dir' will be the common prefix of all files and
506 directories in the archive. 'root_dir' and 'base_dir' both default
507 to the current directory. Returns the name of the archive file.
509 'owner' and 'group' are used when creating a tar archive. By default,
510 uses the current owner and group.
512 save_cwd = os.getcwd()
513 if root_dir is not None:
514 if logger is not None:
515 logger.debug("changing into '%s'", root_dir)
516 base_name = os.path.abspath(base_name)
517 if not dry_run:
518 os.chdir(root_dir)
520 if base_dir is None:
521 base_dir = os.curdir
523 kwargs = {'dry_run': dry_run, 'logger': logger}
525 try:
526 format_info = _ARCHIVE_FORMATS[format]
527 except KeyError:
528 raise ValueError, "unknown archive format '%s'" % format
530 func = format_info[0]
531 for arg, val in format_info[1]:
532 kwargs[arg] = val
534 if format != 'zip':
535 kwargs['owner'] = owner
536 kwargs['group'] = group
538 try:
539 filename = func(base_name, base_dir, **kwargs)
540 finally:
541 if root_dir is not None:
542 if logger is not None:
543 logger.debug("changing back to '%s'", save_cwd)
544 os.chdir(save_cwd)
546 return filename