requirements: ensure newer pytest to avoid pytest_cov travis errors
[git-cola.git] / cola / core.py
blob618c5fb7905fedd941dc551dcb4f17f98eb40e2c
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 from __future__ import division, absolute_import, unicode_literals
8 import os
9 import functools
10 import sys
11 import itertools
12 import platform
13 import subprocess
15 from .decorators import interruptable
16 from .compat import ustr
17 from .compat import PY2
18 from .compat import PY3
19 from .compat import WIN32
21 # /usr/include/stdlib.h
22 # #define EXIT_SUCCESS 0 /* Successful exit status. */
23 # #define EXIT_FAILURE 1 /* Failing exit status. */
24 EXIT_SUCCESS = 0
25 EXIT_FAILURE = 1
27 # /usr/include/sysexits.h
28 # #define EX_USAGE 64 /* command line usage error */
29 # #define EX_NOINPUT 66 /* cannot open input */
30 # #define EX_UNAVAILABLE 69 /* service unavailable */
31 EXIT_USAGE = 64
32 EXIT_NOINPUT = 66
33 EXIT_UNAVAILABLE = 69
35 # Default encoding
36 ENCODING = 'utf-8'
38 # Some files are not in UTF-8; some other aren't in any codification.
39 # Remember that GIT doesn't care about encodings (saves binary data)
40 _encoding_tests = [
41 ENCODING,
42 'iso-8859-15',
43 'windows1252',
44 'ascii',
45 # <-- add encodings here
49 class UStr(ustr):
50 """Unicode string wrapper that remembers its encoding
52 UStr wraps unicode strings to provide the `encoding` attribute.
53 UStr is used when decoding strings of an unknown encoding.
54 In order to generate patches that contain the original byte sequences,
55 we must preserve the original encoding when calling decode()
56 so that it can later be used when reconstructing the original
57 byte sequences.
59 """
60 def __new__(cls, string, encoding):
62 if isinstance(string, UStr):
63 if encoding != string.encoding:
64 raise ValueError('Encoding conflict: %s vs. %s'
65 % (string.encoding, encoding))
66 string = ustr(string)
68 obj = ustr.__new__(cls, string)
69 obj.encoding = encoding
70 return obj
73 def decode(value, encoding=None, errors='strict'):
74 """decode(encoded_string) returns an unencoded unicode string
75 """
76 if value is None:
77 result = None
78 elif isinstance(value, ustr):
79 result = UStr(value, ENCODING)
80 else:
81 result = None
82 if encoding is None:
83 encoding_tests = _encoding_tests
84 else:
85 encoding_tests = itertools.chain([encoding], _encoding_tests)
87 for enc in encoding_tests:
88 try:
89 decoded = value.decode(enc, errors)
90 result = UStr(decoded, enc)
91 break
92 except ValueError:
93 pass
95 if result is None:
96 decoded = value.decode(ENCODING, errors='ignore')
97 result = UStr(decoded, ENCODING)
99 return result
102 def encode(string, encoding=None):
103 """encode(unencoded_string) returns a string encoded in utf-8
105 if not isinstance(string, ustr):
106 return string
107 return string.encode(encoding or ENCODING, 'replace')
110 def mkpath(path, encoding=None):
111 # The Windows API requires unicode strings regardless of python version
112 if WIN32:
113 return decode(path, encoding=encoding)
114 # UNIX prefers bytes
115 return encode(path, encoding=encoding)
118 def list2cmdline(cmd):
119 return subprocess.list2cmdline([decode(c) for c in cmd])
122 def read(filename, size=-1, encoding=None, errors='strict'):
123 """Read filename and return contents"""
124 with xopen(filename, 'rb') as fh:
125 return xread(fh, size=size, encoding=encoding, errors=errors)
128 def write(path, contents, encoding=None):
129 """Writes a unicode string to a file"""
130 with xopen(path, 'wb') as fh:
131 return xwrite(fh, contents, encoding=encoding)
134 @interruptable
135 def xread(fh, size=-1, encoding=None, errors='strict'):
136 """Read from a filehandle and retry when interrupted"""
137 return decode(fh.read(size), encoding=encoding, errors=errors)
140 @interruptable
141 def xwrite(fh, content, encoding=None):
142 """Write to a filehandle and retry when interrupted"""
143 return fh.write(encode(content, encoding=encoding))
146 @interruptable
147 def wait(proc):
148 """Wait on a subprocess and retry when interrupted"""
149 return proc.wait()
152 @interruptable
153 def readline(fh, encoding=None):
154 return decode(fh.readline(), encoding=encoding)
157 @interruptable
158 def start_command(cmd, cwd=None, add_env=None,
159 universal_newlines=False,
160 stdin=subprocess.PIPE,
161 stdout=subprocess.PIPE,
162 no_win32_startupinfo=False,
163 stderr=subprocess.PIPE,
164 **extra):
165 """Start the given command, and return a subprocess object.
167 This provides a simpler interface to the subprocess module.
170 env = extra.pop('env', None)
171 if add_env is not None:
172 env = os.environ.copy()
173 env.update(add_env)
175 # Python3 on windows always goes through list2cmdline() internally inside
176 # of subprocess.py so we must provide unicode strings here otherwise
177 # Python3 breaks when bytes are provided.
179 # Additionally, the preferred usage on Python3 is to pass unicode
180 # strings to subprocess. Python will automatically encode into the
181 # default encoding (utf-8) when it gets unicode strings.
182 shell = extra.get('shell', False)
183 cmd = prep_for_subprocess(cmd, shell=shell)
185 if WIN32 and cwd == getcwd():
186 # Windows cannot deal with passing a cwd that contains unicode
187 # but we luckily can pass None when the supplied cwd is the same
188 # as our current directory and get the same effect.
189 # Not doing this causes unicode encoding errors when launching
190 # the subprocess.
191 cwd = None
193 if PY2 and cwd:
194 cwd = encode(cwd)
196 if WIN32:
197 # If git-cola is invoked on Windows using "start pythonw git-cola",
198 # a console window will briefly flash on the screen each time
199 # git-cola invokes git, which is very annoying. The code below
200 # prevents this by ensuring that any window will be hidden.
201 startupinfo = subprocess.STARTUPINFO()
202 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
203 startupinfo.wShowWindow = subprocess.SW_HIDE
204 extra['startupinfo'] = startupinfo
206 if WIN32 and not no_win32_startupinfo:
207 CREATE_NO_WINDOW = 0x08000000
208 extra['creationflags'] = CREATE_NO_WINDOW
210 return subprocess.Popen(cmd, bufsize=1, stdin=stdin, stdout=stdout,
211 stderr=stderr, cwd=cwd, env=env,
212 universal_newlines=universal_newlines, **extra)
215 def prep_for_subprocess(cmd, shell=False):
216 """Decode on Python3, encode on Python2"""
217 # See the comment in start_command()
218 if shell:
219 if PY3:
220 cmd = decode(cmd)
221 else:
222 cmd = encode(cmd)
223 else:
224 if PY3:
225 cmd = [decode(c) for c in cmd]
226 else:
227 cmd = [encode(c) for c in cmd]
228 return cmd
231 @interruptable
232 def communicate(proc):
233 return proc.communicate()
236 def run_command(cmd, *args, **kwargs):
237 """Run the given command to completion, and return its results.
239 This provides a simpler interface to the subprocess module.
240 The results are formatted as a 3-tuple: (exit_code, output, errors)
241 The other arguments are passed on to start_command().
244 encoding = kwargs.pop('encoding', None)
245 process = start_command(cmd, *args, **kwargs)
246 (output, errors) = communicate(process)
247 output = decode(output, encoding=encoding)
248 errors = decode(errors, encoding=encoding)
249 exit_code = process.returncode
250 return (exit_code,
251 output or UStr('', ENCODING),
252 errors or UStr('', ENCODING))
255 @interruptable
256 def _fork_posix(args, cwd=None):
257 """Launch a process in the background."""
258 encoded_args = [encode(arg) for arg in args]
259 return subprocess.Popen(encoded_args, cwd=cwd).pid
262 def _fork_win32(args, cwd=None):
263 """Launch a background process using crazy win32 voodoo."""
264 # This is probably wrong, but it works. Windows.. wow.
265 if args[0] == 'git-dag':
266 # win32 can't exec python scripts
267 args = [sys.executable] + args
268 args[0] = _win32_find_exe(args[0])
270 if PY3:
271 # see comment in start_command()
272 argv = [decode(arg) for arg in args]
273 else:
274 argv = [encode(arg) for arg in args]
276 DETACHED_PROCESS = 0x00000008 # Amazing!
277 return subprocess.Popen(argv, cwd=cwd, creationflags=DETACHED_PROCESS).pid
280 def _win32_find_exe(exe):
281 """Find the actual file for a Windows executable.
283 This function goes through the same process that the Windows shell uses to
284 locate an executable, taking into account the PATH and PATHEXT environment
285 variables. This allows us to avoid passing shell=True to subprocess.Popen.
287 For reference, see:
288 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
291 # try the argument itself
292 candidates = [exe]
293 # if argument does not have an extension, also try it with each of the
294 # extensions specified in PATHEXT
295 if '.' not in exe:
296 extensions = getenv('PATHEXT', '').split(os.pathsep)
297 candidates.extend([(exe + ext) for ext in extensions
298 if ext.startswith('.')])
299 # search the current directory first
300 for candidate in candidates:
301 if exists(candidate):
302 return candidate
303 # if the argument does not include a path separator, search each of the
304 # directories on the PATH
305 if not os.path.dirname(exe):
306 for path in getenv('PATH').split(os.pathsep):
307 if path:
308 for candidate in candidates:
309 full_path = os.path.join(path, candidate)
310 if exists(full_path):
311 return full_path
312 # not found, punt and return the argument unchanged
313 return exe
316 # Portability wrappers
317 if sys.platform == 'win32' or sys.platform == 'cygwin':
318 fork = _fork_win32
319 else:
320 fork = _fork_posix
323 def _decorator_noop(x):
324 return x
327 def wrap(action, fn, decorator=None):
328 """Wrap arguments with `action`, optionally decorate the result"""
329 if decorator is None:
330 decorator = _decorator_noop
332 @functools.wraps(fn)
333 def wrapped(*args, **kwargs):
334 return decorator(fn(action(*args, **kwargs)))
336 return wrapped
339 def decorate(decorator, fn):
340 """Decorate the result of `fn` with `action`"""
341 @functools.wraps(fn)
342 def decorated(*args, **kwargs):
343 return decorator(fn(*args, **kwargs))
344 return decorated
347 def getenv(name, default=None):
348 return decode(os.getenv(name, default))
351 def xopen(path, mode='r', encoding=None):
352 return open(mkpath(path, encoding=encoding), mode)
355 def print_stdout(msg, linesep='\n'):
356 msg = msg + linesep
357 if PY2:
358 msg = encode(msg, encoding=ENCODING)
359 sys.stdout.write(msg)
362 def print_stderr(msg, linesep='\n'):
363 msg = msg + linesep
364 if PY2:
365 msg = encode(msg, encoding=ENCODING)
366 sys.stderr.write(msg)
369 def error(msg, status=EXIT_FAILURE, linesep='\n'):
370 print_stderr(msg, linesep=linesep)
371 sys.exit(status)
374 @interruptable
375 def node():
376 return platform.node()
379 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
380 chdir = wrap(mkpath, os.chdir)
381 exists = wrap(mkpath, os.path.exists)
382 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
383 if PY2:
384 if hasattr(os, 'getcwdu'):
385 # pylint: disable=no-member
386 getcwd = os.getcwdu
387 else:
388 getcwd = decorate(decode, os.getcwd)
389 else:
390 getcwd = os.getcwd
393 # NOTE: find_executable() is originally from the stdlib, but starting with
394 # python3.7 the stdlib no longer bundles distutils.
395 def _find_executable(executable, path=None):
396 """Tries to find 'executable' in the directories listed in 'path'.
398 A string listing directories separated by 'os.pathsep'; defaults to
399 os.environ['PATH']. Returns the complete filename or None if not found.
401 if path is None:
402 path = os.environ['PATH']
404 paths = path.split(os.pathsep)
405 _, ext = os.path.splitext(executable)
407 if (sys.platform == 'win32') and (ext != '.exe'):
408 executable = executable + '.exe'
410 if not os.path.isfile(executable):
411 for p in paths:
412 f = os.path.join(p, executable)
413 if os.path.isfile(f):
414 # the file exists, we have a shot at spawn working
415 return f
416 return None
418 return executable
421 if PY2:
422 find_executable = wrap(mkpath, _find_executable, decorator=decode)
423 else:
424 find_executable = wrap(decode, _find_executable, decorator=decode)
425 isdir = wrap(mkpath, os.path.isdir)
426 isfile = wrap(mkpath, os.path.isfile)
427 islink = wrap(mkpath, os.path.islink)
428 makedirs = wrap(mkpath, os.makedirs)
429 try:
430 readlink = wrap(mkpath, os.readlink, decorator=decode)
431 except AttributeError:
433 def _readlink_noop(p):
434 return p
436 readlink = _readlink_noop
438 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
439 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
440 stat = wrap(mkpath, os.stat)
441 unlink = wrap(mkpath, os.unlink)
442 walk = wrap(mkpath, os.walk)