core: python3 support for core.walk
[git-cola.git] / cola / core.py
blob52eca611f16f9d36a19daf20085b751badd4e66c
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 from __future__ import absolute_import, division, print_function, unicode_literals
8 import functools
9 import itertools
10 import mimetypes
11 import os
12 import platform
13 import subprocess
14 import sys
16 from .decorators import interruptable
17 from .compat import ustr
18 from .compat import PY2
19 from .compat import PY3
20 from .compat import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
25 EXIT_SUCCESS = 0
26 EXIT_FAILURE = 1
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
32 EXIT_USAGE = 64
33 EXIT_NOINPUT = 66
34 EXIT_UNAVAILABLE = 69
36 # Default encoding
37 ENCODING = 'utf-8'
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
41 _encoding_tests = [
42 ENCODING,
43 'iso-8859-15',
44 'windows1252',
45 'ascii',
46 # <-- add encodings here
50 class UStr(ustr):
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
58 byte sequences.
60 """
62 def __new__(cls, string, encoding):
64 if isinstance(string, UStr):
65 if encoding != string.encoding:
66 raise ValueError(
67 'Encoding conflict: %s vs. %s' % (string.encoding, encoding)
69 string = ustr(string)
71 obj = ustr.__new__(cls, string)
72 obj.encoding = encoding
73 return obj
76 def decode_maybe(value, encoding, errors='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value, 'decode'):
79 result = value.decode(encoding, errors=errors)
80 else:
81 result = value
82 return result
85 def decode(value, encoding=None, errors='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
87 if value is None:
88 result = None
89 elif isinstance(value, ustr):
90 result = UStr(value, ENCODING)
91 elif encoding == 'bytes':
92 result = value
93 else:
94 result = None
95 if encoding is None:
96 encoding_tests = _encoding_tests
97 else:
98 encoding_tests = itertools.chain([encoding], _encoding_tests)
100 for enc in encoding_tests:
101 try:
102 decoded = value.decode(enc, errors)
103 result = UStr(decoded, enc)
104 break
105 except ValueError:
106 pass
108 if result is None:
109 decoded = value.decode(ENCODING, errors='ignore')
110 result = UStr(decoded, ENCODING)
112 return result
115 def encode(string, encoding=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string, ustr):
118 return string
119 return string.encode(encoding or ENCODING, 'replace')
122 def mkpath(path, encoding=None):
123 # The Windows API requires unicode strings regardless of python version
124 if WIN32:
125 return decode(path, encoding=encoding)
126 # UNIX prefers bytes
127 return encode(path, encoding=encoding)
130 def decode_seq(seq, encoding=None):
131 """Decode a sequence of values"""
132 return [decode(x, encoding=encoding) for x in seq]
135 def list2cmdline(cmd):
136 return subprocess.list2cmdline([decode(c) for c in cmd])
139 def read(filename, size=-1, encoding=None, errors='strict'):
140 """Read filename and return contents"""
141 with xopen(filename, 'rb') as fh:
142 return xread(fh, size=size, encoding=encoding, errors=errors)
145 def write(path, contents, encoding=None, append=False):
146 """Writes a unicode string to a file"""
147 if append:
148 mode = 'ab'
149 else:
150 mode = 'wb'
151 with xopen(path, mode) as fh:
152 return xwrite(fh, contents, encoding=encoding)
155 @interruptable
156 def xread(fh, size=-1, encoding=None, errors='strict'):
157 """Read from a filehandle and retry when interrupted"""
158 return decode(fh.read(size), encoding=encoding, errors=errors)
161 @interruptable
162 def xwrite(fh, content, encoding=None):
163 """Write to a filehandle and retry when interrupted"""
164 return fh.write(encode(content, encoding=encoding))
167 @interruptable
168 def wait(proc):
169 """Wait on a subprocess and retry when interrupted"""
170 return proc.wait()
173 @interruptable
174 def readline(fh, encoding=None):
175 return decode(fh.readline(), encoding=encoding)
178 @interruptable
179 def start_command(
180 cmd,
181 cwd=None,
182 add_env=None,
183 universal_newlines=False,
184 stdin=subprocess.PIPE,
185 stdout=subprocess.PIPE,
186 no_win32_startupinfo=False,
187 stderr=subprocess.PIPE,
188 **extra
190 """Start the given command, and return a subprocess object.
192 This provides a simpler interface to the subprocess module.
195 env = extra.pop('env', None)
196 if add_env is not None:
197 env = os.environ.copy()
198 env.update(add_env)
200 # Python3 on windows always goes through list2cmdline() internally inside
201 # of subprocess.py so we must provide unicode strings here otherwise
202 # Python3 breaks when bytes are provided.
204 # Additionally, the preferred usage on Python3 is to pass unicode
205 # strings to subprocess. Python will automatically encode into the
206 # default encoding (utf-8) when it gets unicode strings.
207 shell = extra.get('shell', False)
208 cmd = prep_for_subprocess(cmd, shell=shell)
210 if WIN32 and cwd == getcwd():
211 # Windows cannot deal with passing a cwd that contains unicode
212 # but we luckily can pass None when the supplied cwd is the same
213 # as our current directory and get the same effect.
214 # Not doing this causes unicode encoding errors when launching
215 # the subprocess.
216 cwd = None
218 if PY2 and cwd:
219 cwd = encode(cwd)
221 if WIN32:
222 # If git-cola is invoked on Windows using "start pythonw git-cola",
223 # a console window will briefly flash on the screen each time
224 # git-cola invokes git, which is very annoying. The code below
225 # prevents this by ensuring that any window will be hidden.
226 startupinfo = subprocess.STARTUPINFO()
227 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
228 startupinfo.wShowWindow = subprocess.SW_HIDE
229 extra['startupinfo'] = startupinfo
231 if WIN32 and not no_win32_startupinfo:
232 CREATE_NO_WINDOW = 0x08000000
233 extra['creationflags'] = CREATE_NO_WINDOW
235 # Use line buffering when in text/universal_newlines mode,
236 # otherwise use the system default buffer size.
237 bufsize = 1 if universal_newlines else -1
238 return subprocess.Popen(
239 cmd,
240 bufsize=bufsize,
241 stdin=stdin,
242 stdout=stdout,
243 stderr=stderr,
244 cwd=cwd,
245 env=env,
246 universal_newlines=universal_newlines,
247 **extra
251 def prep_for_subprocess(cmd, shell=False):
252 """Decode on Python3, encode on Python2"""
253 # See the comment in start_command()
254 if shell:
255 if PY3:
256 cmd = decode(cmd)
257 else:
258 cmd = encode(cmd)
259 else:
260 if PY3:
261 cmd = [decode(c) for c in cmd]
262 else:
263 cmd = [encode(c) for c in cmd]
264 return cmd
267 @interruptable
268 def communicate(proc):
269 return proc.communicate()
272 def run_command(cmd, *args, **kwargs):
273 """Run the given command to completion, and return its results.
275 This provides a simpler interface to the subprocess module.
276 The results are formatted as a 3-tuple: (exit_code, output, errors)
277 The other arguments are passed on to start_command().
280 encoding = kwargs.pop('encoding', None)
281 process = start_command(cmd, *args, **kwargs)
282 (output, errors) = communicate(process)
283 output = decode(output, encoding=encoding)
284 errors = decode(errors, encoding=encoding)
285 exit_code = process.returncode
286 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
289 @interruptable
290 def _fork_posix(args, cwd=None, shell=False):
291 """Launch a process in the background."""
292 encoded_args = [encode(arg) for arg in args]
293 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
296 def _fork_win32(args, cwd=None, shell=False):
297 """Launch a background process using crazy win32 voodoo."""
298 # This is probably wrong, but it works. Windows.. wow.
299 if args[0] == 'git-dag':
300 # win32 can't exec python scripts
301 args = [sys.executable] + args
303 if not shell:
304 args[0] = _win32_find_exe(args[0])
306 if PY3:
307 # see comment in start_command()
308 argv = [decode(arg) for arg in args]
309 else:
310 argv = [encode(arg) for arg in args]
312 DETACHED_PROCESS = 0x00000008 # Amazing!
313 return subprocess.Popen(
314 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
315 ).pid
318 def _win32_find_exe(exe):
319 """Find the actual file for a Windows executable.
321 This function goes through the same process that the Windows shell uses to
322 locate an executable, taking into account the PATH and PATHEXT environment
323 variables. This allows us to avoid passing shell=True to subprocess.Popen.
325 For reference, see:
326 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
329 # try the argument itself
330 candidates = [exe]
331 # if argument does not have an extension, also try it with each of the
332 # extensions specified in PATHEXT
333 if '.' not in exe:
334 extensions = getenv('PATHEXT', '').split(os.pathsep)
335 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
336 # search the current directory first
337 for candidate in candidates:
338 if exists(candidate):
339 return candidate
340 # if the argument does not include a path separator, search each of the
341 # directories on the PATH
342 if not os.path.dirname(exe):
343 for path in getenv('PATH').split(os.pathsep):
344 if path:
345 for candidate in candidates:
346 full_path = os.path.join(path, candidate)
347 if exists(full_path):
348 return full_path
349 # not found, punt and return the argument unchanged
350 return exe
353 # Portability wrappers
354 if sys.platform in {'win32', 'cygwin'}:
355 fork = _fork_win32
356 else:
357 fork = _fork_posix
360 def _decorator_noop(x):
361 return x
364 def wrap(action, func, decorator=None):
365 """Wrap arguments with `action`, optionally decorate the result"""
366 if decorator is None:
367 decorator = _decorator_noop
369 @functools.wraps(func)
370 def wrapped(*args, **kwargs):
371 return decorator(func(action(*args, **kwargs)))
373 return wrapped
376 def decorate(decorator, func):
377 """Decorate the result of `func` with `action`"""
379 @functools.wraps(func)
380 def decorated(*args, **kwargs):
381 return decorator(func(*args, **kwargs))
383 return decorated
386 def getenv(name, default=None):
387 return decode(os.getenv(name, default))
390 def guess_mimetype(filename):
391 """Robustly guess a filename's mimetype"""
392 mimetype = None
393 try:
394 mimetype = mimetypes.guess_type(filename)[0]
395 except UnicodeEncodeError:
396 mimetype = mimetypes.guess_type(encode(filename))[0]
397 except (TypeError, ValueError):
398 mimetype = mimetypes.guess_type(decode(filename))[0]
399 return mimetype
402 def xopen(path, mode='r', encoding=None):
403 """Open a file with the specified mode and encoding
405 The path is decoded into unicode on Windows and encoded into bytes on Unix.
407 # pylint: disable=unspecified-encoding
408 return open(mkpath(path, encoding=encoding), mode)
411 def open_append(path, encoding=None):
412 """Open a file for appending in utf-8 text mode"""
413 return open(mkpath(path, encoding=encoding), 'a', encoding='utf-8')
416 def open_read(path, encoding=None):
417 """Open a file for reading in utf-8 text mode"""
418 return open(mkpath(path, encoding=encoding), 'rt', encoding='utf-8')
421 def open_write(path, encoding=None):
422 """Open a file for writing in utf-8 text mode"""
423 return open(mkpath(path, encoding=encoding), 'wt', encoding='utf-8')
426 def print_stdout(msg, linesep='\n'):
427 msg = msg + linesep
428 if PY2:
429 msg = encode(msg, encoding=ENCODING)
430 sys.stdout.write(msg)
433 def print_stderr(msg, linesep='\n'):
434 msg = msg + linesep
435 if PY2:
436 msg = encode(msg, encoding=ENCODING)
437 sys.stderr.write(msg)
440 def error(msg, status=EXIT_FAILURE, linesep='\n'):
441 print_stderr(msg, linesep=linesep)
442 sys.exit(status)
445 @interruptable
446 def node():
447 return platform.node()
450 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
451 chdir = wrap(mkpath, os.chdir)
452 exists = wrap(mkpath, os.path.exists)
453 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
454 if PY2:
455 if hasattr(os, 'getcwdu'):
456 # pylint: disable=no-member
457 getcwd = os.getcwdu
458 else:
459 getcwd = decorate(decode, os.getcwd)
460 else:
461 getcwd = os.getcwd
464 # NOTE: find_executable() is originally from the stdlib, but starting with
465 # python3.7 the stdlib no longer bundles distutils.
466 def _find_executable(executable, path=None):
467 """Tries to find 'executable' in the directories listed in 'path'.
469 A string listing directories separated by 'os.pathsep'; defaults to
470 os.environ['PATH']. Returns the complete filename or None if not found.
472 if path is None:
473 path = os.environ['PATH']
475 paths = path.split(os.pathsep)
476 _, ext = os.path.splitext(executable)
478 if (sys.platform == 'win32') and (ext != '.exe'):
479 executable = executable + '.exe'
481 if not os.path.isfile(executable):
482 for dirname in paths:
483 filename = os.path.join(dirname, executable)
484 if os.path.isfile(filename):
485 # the file exists, we have a shot at spawn working
486 return filename
487 return None
489 return executable
492 def sync():
493 """Force writing of everything to disk. No-op on systems without os.sync()"""
494 if hasattr(os, 'sync'):
495 os.sync()
498 def rename(old, new):
499 """Rename a path. Transform arguments to handle non-ascii file paths"""
500 os.rename(mkpath(old), mkpath(new))
503 if PY2:
504 find_executable = wrap(mkpath, _find_executable, decorator=decode)
505 else:
506 find_executable = wrap(decode, _find_executable, decorator=decode)
507 isdir = wrap(mkpath, os.path.isdir)
508 isfile = wrap(mkpath, os.path.isfile)
509 islink = wrap(mkpath, os.path.islink)
510 listdir = wrap(mkpath, os.listdir, decorator=decode_seq)
511 makedirs = wrap(mkpath, os.makedirs)
512 try:
513 readlink = wrap(mkpath, os.readlink, decorator=decode)
514 except AttributeError:
516 def _readlink_noop(p):
517 return p
519 readlink = _readlink_noop
521 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
522 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
523 remove = wrap(mkpath, os.remove)
524 stat = wrap(mkpath, os.stat)
525 unlink = wrap(mkpath, os.unlink)
526 if PY2:
527 walk = wrap(mkpath, os.walk)
528 else:
529 walk = os.walk