CHANGES: add a link to issue 890
[git-cola.git] / cola / core.py
blobf71cc4f67fa1fa352160f3ab2c8237f4bfcfda5d
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 from __future__ import division, absolute_import, unicode_literals
8 import functools
9 import itertools
10 import mimetypes
11 import os
12 import platform
13 import subprocess
14 import sys
16 from .decorators import interruptable
17 from .compat import ustr
18 from .compat import PY2
19 from .compat import PY3
20 from .compat import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
25 EXIT_SUCCESS = 0
26 EXIT_FAILURE = 1
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
32 EXIT_USAGE = 64
33 EXIT_NOINPUT = 66
34 EXIT_UNAVAILABLE = 69
36 # Default encoding
37 ENCODING = 'utf-8'
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
41 _encoding_tests = [
42 ENCODING,
43 'iso-8859-15',
44 'windows1252',
45 'ascii',
46 # <-- add encodings here
50 class UStr(ustr):
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
58 byte sequences.
60 """
62 def __new__(cls, string, encoding):
64 if isinstance(string, UStr):
65 if encoding != string.encoding:
66 raise ValueError(
67 'Encoding conflict: %s vs. %s' % (string.encoding, encoding)
69 string = ustr(string)
71 obj = ustr.__new__(cls, string)
72 obj.encoding = encoding
73 return obj
76 def decode_maybe(value, encoding, errors='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value, 'decode'):
79 result = value.decode(encoding, errors=errors)
80 else:
81 result = value
82 return result
85 def decode(value, encoding=None, errors='strict'):
86 """decode(encoded_string) returns an unencoded unicode string
87 """
88 if value is None:
89 result = None
90 elif isinstance(value, ustr):
91 result = UStr(value, ENCODING)
92 elif encoding == 'bytes':
93 result = value
94 else:
95 result = None
96 if encoding is None:
97 encoding_tests = _encoding_tests
98 else:
99 encoding_tests = itertools.chain([encoding], _encoding_tests)
101 for enc in encoding_tests:
102 try:
103 decoded = value.decode(enc, errors)
104 result = UStr(decoded, enc)
105 break
106 except ValueError:
107 pass
109 if result is None:
110 decoded = value.decode(ENCODING, errors='ignore')
111 result = UStr(decoded, ENCODING)
113 return result
116 def encode(string, encoding=None):
117 """encode(unencoded_string) returns a string encoded in utf-8
119 if not isinstance(string, ustr):
120 return string
121 return string.encode(encoding or ENCODING, 'replace')
124 def mkpath(path, encoding=None):
125 # The Windows API requires unicode strings regardless of python version
126 if WIN32:
127 return decode(path, encoding=encoding)
128 # UNIX prefers bytes
129 return encode(path, encoding=encoding)
132 def list2cmdline(cmd):
133 return subprocess.list2cmdline([decode(c) for c in cmd])
136 def read(filename, size=-1, encoding=None, errors='strict'):
137 """Read filename and return contents"""
138 with xopen(filename, 'rb') as fh:
139 return xread(fh, size=size, encoding=encoding, errors=errors)
142 def write(path, contents, encoding=None):
143 """Writes a unicode string to a file"""
144 with xopen(path, 'wb') as fh:
145 return xwrite(fh, contents, encoding=encoding)
148 @interruptable
149 def xread(fh, size=-1, encoding=None, errors='strict'):
150 """Read from a filehandle and retry when interrupted"""
151 return decode(fh.read(size), encoding=encoding, errors=errors)
154 @interruptable
155 def xwrite(fh, content, encoding=None):
156 """Write to a filehandle and retry when interrupted"""
157 return fh.write(encode(content, encoding=encoding))
160 @interruptable
161 def wait(proc):
162 """Wait on a subprocess and retry when interrupted"""
163 return proc.wait()
166 @interruptable
167 def readline(fh, encoding=None):
168 return decode(fh.readline(), encoding=encoding)
171 @interruptable
172 def start_command(
173 cmd,
174 cwd=None,
175 add_env=None,
176 universal_newlines=False,
177 stdin=subprocess.PIPE,
178 stdout=subprocess.PIPE,
179 no_win32_startupinfo=False,
180 stderr=subprocess.PIPE,
181 **extra
183 """Start the given command, and return a subprocess object.
185 This provides a simpler interface to the subprocess module.
188 env = extra.pop('env', None)
189 if add_env is not None:
190 env = os.environ.copy()
191 env.update(add_env)
193 # Python3 on windows always goes through list2cmdline() internally inside
194 # of subprocess.py so we must provide unicode strings here otherwise
195 # Python3 breaks when bytes are provided.
197 # Additionally, the preferred usage on Python3 is to pass unicode
198 # strings to subprocess. Python will automatically encode into the
199 # default encoding (utf-8) when it gets unicode strings.
200 shell = extra.get('shell', False)
201 cmd = prep_for_subprocess(cmd, shell=shell)
203 if WIN32 and cwd == getcwd():
204 # Windows cannot deal with passing a cwd that contains unicode
205 # but we luckily can pass None when the supplied cwd is the same
206 # as our current directory and get the same effect.
207 # Not doing this causes unicode encoding errors when launching
208 # the subprocess.
209 cwd = None
211 if PY2 and cwd:
212 cwd = encode(cwd)
214 if WIN32:
215 # If git-cola is invoked on Windows using "start pythonw git-cola",
216 # a console window will briefly flash on the screen each time
217 # git-cola invokes git, which is very annoying. The code below
218 # prevents this by ensuring that any window will be hidden.
219 startupinfo = subprocess.STARTUPINFO()
220 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
221 startupinfo.wShowWindow = subprocess.SW_HIDE
222 extra['startupinfo'] = startupinfo
224 if WIN32 and not no_win32_startupinfo:
225 CREATE_NO_WINDOW = 0x08000000
226 extra['creationflags'] = CREATE_NO_WINDOW
228 # Use line buffering when in text/universal_newlines mode,
229 # otherwise use the system default buffer size.
230 bufsize = 1 if universal_newlines else -1
231 return subprocess.Popen(
232 cmd,
233 bufsize=bufsize,
234 stdin=stdin,
235 stdout=stdout,
236 stderr=stderr,
237 cwd=cwd,
238 env=env,
239 universal_newlines=universal_newlines,
240 **extra
244 def prep_for_subprocess(cmd, shell=False):
245 """Decode on Python3, encode on Python2"""
246 # See the comment in start_command()
247 if shell:
248 if PY3:
249 cmd = decode(cmd)
250 else:
251 cmd = encode(cmd)
252 else:
253 if PY3:
254 cmd = [decode(c) for c in cmd]
255 else:
256 cmd = [encode(c) for c in cmd]
257 return cmd
260 @interruptable
261 def communicate(proc):
262 return proc.communicate()
265 def run_command(cmd, *args, **kwargs):
266 """Run the given command to completion, and return its results.
268 This provides a simpler interface to the subprocess module.
269 The results are formatted as a 3-tuple: (exit_code, output, errors)
270 The other arguments are passed on to start_command().
273 encoding = kwargs.pop('encoding', None)
274 process = start_command(cmd, *args, **kwargs)
275 (output, errors) = communicate(process)
276 output = decode(output, encoding=encoding)
277 errors = decode(errors, encoding=encoding)
278 exit_code = process.returncode
279 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
282 @interruptable
283 def _fork_posix(args, cwd=None, shell=False):
284 """Launch a process in the background."""
285 encoded_args = [encode(arg) for arg in args]
286 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
289 def _fork_win32(args, cwd=None, shell=False):
290 """Launch a background process using crazy win32 voodoo."""
291 # This is probably wrong, but it works. Windows.. wow.
292 if args[0] == 'git-dag':
293 # win32 can't exec python scripts
294 args = [sys.executable] + args
296 if not shell:
297 args[0] = _win32_find_exe(args[0])
299 if PY3:
300 # see comment in start_command()
301 argv = [decode(arg) for arg in args]
302 else:
303 argv = [encode(arg) for arg in args]
305 DETACHED_PROCESS = 0x00000008 # Amazing!
306 return subprocess.Popen(
307 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
308 ).pid
311 def _win32_find_exe(exe):
312 """Find the actual file for a Windows executable.
314 This function goes through the same process that the Windows shell uses to
315 locate an executable, taking into account the PATH and PATHEXT environment
316 variables. This allows us to avoid passing shell=True to subprocess.Popen.
318 For reference, see:
319 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
322 # try the argument itself
323 candidates = [exe]
324 # if argument does not have an extension, also try it with each of the
325 # extensions specified in PATHEXT
326 if '.' not in exe:
327 extensions = getenv('PATHEXT', '').split(os.pathsep)
328 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
329 # search the current directory first
330 for candidate in candidates:
331 if exists(candidate):
332 return candidate
333 # if the argument does not include a path separator, search each of the
334 # directories on the PATH
335 if not os.path.dirname(exe):
336 for path in getenv('PATH').split(os.pathsep):
337 if path:
338 for candidate in candidates:
339 full_path = os.path.join(path, candidate)
340 if exists(full_path):
341 return full_path
342 # not found, punt and return the argument unchanged
343 return exe
346 # Portability wrappers
347 if sys.platform == 'win32' or sys.platform == 'cygwin':
348 fork = _fork_win32
349 else:
350 fork = _fork_posix
353 def _decorator_noop(x):
354 return x
357 def wrap(action, fn, decorator=None):
358 """Wrap arguments with `action`, optionally decorate the result"""
359 if decorator is None:
360 decorator = _decorator_noop
362 @functools.wraps(fn)
363 def wrapped(*args, **kwargs):
364 return decorator(fn(action(*args, **kwargs)))
366 return wrapped
369 def decorate(decorator, fn):
370 """Decorate the result of `fn` with `action`"""
372 @functools.wraps(fn)
373 def decorated(*args, **kwargs):
374 return decorator(fn(*args, **kwargs))
376 return decorated
379 def getenv(name, default=None):
380 return decode(os.getenv(name, default))
383 def guess_mimetype(filename):
384 """Robustly guess a filename's mimetype"""
385 mimetype = None
386 try:
387 mimetype = mimetypes.guess_type(filename)[0]
388 except UnicodeEncodeError:
389 mimetype = mimetypes.guess_type(encode(filename))[0]
390 except (TypeError, ValueError):
391 mimetype = mimetypes.guess_type(decode(filename))[0]
392 return mimetype
395 def xopen(path, mode='r', encoding=None):
396 return open(mkpath(path, encoding=encoding), mode)
399 def print_stdout(msg, linesep='\n'):
400 msg = msg + linesep
401 if PY2:
402 msg = encode(msg, encoding=ENCODING)
403 sys.stdout.write(msg)
406 def print_stderr(msg, linesep='\n'):
407 msg = msg + linesep
408 if PY2:
409 msg = encode(msg, encoding=ENCODING)
410 sys.stderr.write(msg)
413 def error(msg, status=EXIT_FAILURE, linesep='\n'):
414 print_stderr(msg, linesep=linesep)
415 sys.exit(status)
418 @interruptable
419 def node():
420 return platform.node()
423 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
424 chdir = wrap(mkpath, os.chdir)
425 exists = wrap(mkpath, os.path.exists)
426 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
427 if PY2:
428 if hasattr(os, 'getcwdu'):
429 # pylint: disable=no-member
430 getcwd = os.getcwdu
431 else:
432 getcwd = decorate(decode, os.getcwd)
433 else:
434 getcwd = os.getcwd
437 # NOTE: find_executable() is originally from the stdlib, but starting with
438 # python3.7 the stdlib no longer bundles distutils.
439 def _find_executable(executable, path=None):
440 """Tries to find 'executable' in the directories listed in 'path'.
442 A string listing directories separated by 'os.pathsep'; defaults to
443 os.environ['PATH']. Returns the complete filename or None if not found.
445 if path is None:
446 path = os.environ['PATH']
448 paths = path.split(os.pathsep)
449 _, ext = os.path.splitext(executable)
451 if (sys.platform == 'win32') and (ext != '.exe'):
452 executable = executable + '.exe'
454 if not os.path.isfile(executable):
455 for p in paths:
456 f = os.path.join(p, executable)
457 if os.path.isfile(f):
458 # the file exists, we have a shot at spawn working
459 return f
460 return None
462 return executable
465 if PY2:
466 find_executable = wrap(mkpath, _find_executable, decorator=decode)
467 else:
468 find_executable = wrap(decode, _find_executable, decorator=decode)
469 isdir = wrap(mkpath, os.path.isdir)
470 isfile = wrap(mkpath, os.path.isfile)
471 islink = wrap(mkpath, os.path.islink)
472 makedirs = wrap(mkpath, os.makedirs)
473 try:
474 readlink = wrap(mkpath, os.readlink, decorator=decode)
475 except AttributeError:
477 def _readlink_noop(p):
478 return p
480 readlink = _readlink_noop
482 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
483 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
484 stat = wrap(mkpath, os.stat)
485 unlink = wrap(mkpath, os.unlink)
486 walk = wrap(mkpath, os.walk)