doc: update v3.3 release notes draft
[git-cola.git] / cola / core.py
blob1b25b0a21f5707bf93d2641547e593038f67ae78
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 from __future__ import division, absolute_import, unicode_literals
8 import os
9 import functools
10 import sys
11 import itertools
12 import platform
13 import subprocess
15 from .decorators import interruptable
16 from .compat import ustr
17 from .compat import PY2
18 from .compat import PY3
19 from .compat import WIN32
21 # /usr/include/stdlib.h
22 # #define EXIT_SUCCESS 0 /* Successful exit status. */
23 # #define EXIT_FAILURE 1 /* Failing exit status. */
24 EXIT_SUCCESS = 0
25 EXIT_FAILURE = 1
27 # /usr/include/sysexits.h
28 # #define EX_USAGE 64 /* command line usage error */
29 # #define EX_NOINPUT 66 /* cannot open input */
30 # #define EX_UNAVAILABLE 69 /* service unavailable */
31 EXIT_USAGE = 64
32 EXIT_NOINPUT = 66
33 EXIT_UNAVAILABLE = 69
35 # Default encoding
36 ENCODING = 'utf-8'
38 # Some files are not in UTF-8; some other aren't in any codification.
39 # Remember that GIT doesn't care about encodings (saves binary data)
40 _encoding_tests = [
41 ENCODING,
42 'iso-8859-15',
43 'windows1252',
44 'ascii',
45 # <-- add encodings here
49 class UStr(ustr):
50 """Unicode string wrapper that remembers its encoding
52 UStr wraps unicode strings to provide the `encoding` attribute.
53 UStr is used when decoding strings of an unknown encoding.
54 In order to generate patches that contain the original byte sequences,
55 we must preserve the original encoding when calling decode()
56 so that it can later be used when reconstructing the original
57 byte sequences.
59 """
60 def __new__(cls, string, encoding):
62 if isinstance(string, UStr):
63 if encoding != string.encoding:
64 raise ValueError('Encoding conflict: %s vs. %s'
65 % (string.encoding, encoding))
66 string = ustr(string)
68 obj = ustr.__new__(cls, string)
69 obj.encoding = encoding
70 return obj
73 def decode(value, encoding=None, errors='strict'):
74 """decode(encoded_string) returns an unencoded unicode string
75 """
76 if value is None:
77 result = None
78 elif isinstance(value, ustr):
79 result = UStr(value, ENCODING)
80 else:
81 result = None
82 if encoding is None:
83 encoding_tests = _encoding_tests
84 else:
85 encoding_tests = itertools.chain([encoding], _encoding_tests)
87 for enc in encoding_tests:
88 try:
89 decoded = value.decode(enc, errors)
90 result = UStr(decoded, enc)
91 break
92 except ValueError:
93 pass
95 if result is None:
96 decoded = value.decode(ENCODING, errors='ignore')
97 result = UStr(decoded, ENCODING)
99 return result
102 def encode(string, encoding=None):
103 """encode(unencoded_string) returns a string encoded in utf-8
105 if not isinstance(string, ustr):
106 return string
107 return string.encode(encoding or ENCODING, 'replace')
110 def mkpath(path, encoding=None):
111 # The Windows API requires unicode strings regardless of python version
112 if WIN32:
113 return decode(path, encoding=encoding)
114 # UNIX prefers bytes
115 return encode(path, encoding=encoding)
118 def list2cmdline(cmd):
119 return subprocess.list2cmdline([decode(c) for c in cmd])
122 def read(filename, size=-1, encoding=None, errors='strict'):
123 """Read filename and return contents"""
124 with xopen(filename, 'rb') as fh:
125 return xread(fh, size=size, encoding=encoding, errors=errors)
128 def write(path, contents, encoding=None):
129 """Writes a unicode string to a file"""
130 with xopen(path, 'wb') as fh:
131 return xwrite(fh, contents, encoding=encoding)
134 @interruptable
135 def xread(fh, size=-1, encoding=None, errors='strict'):
136 """Read from a filehandle and retry when interrupted"""
137 return decode(fh.read(size), encoding=encoding, errors=errors)
140 @interruptable
141 def xwrite(fh, content, encoding=None):
142 """Write to a filehandle and retry when interrupted"""
143 return fh.write(encode(content, encoding=encoding))
146 @interruptable
147 def wait(proc):
148 """Wait on a subprocess and retry when interrupted"""
149 return proc.wait()
152 @interruptable
153 def readline(fh, encoding=None):
154 return decode(fh.readline(), encoding=encoding)
157 @interruptable
158 def start_command(cmd, cwd=None, add_env=None,
159 universal_newlines=False,
160 stdin=subprocess.PIPE,
161 stdout=subprocess.PIPE,
162 no_win32_startupinfo=False,
163 stderr=subprocess.PIPE,
164 **extra):
165 """Start the given command, and return a subprocess object.
167 This provides a simpler interface to the subprocess module.
170 env = extra.pop('env', None)
171 if add_env is not None:
172 env = os.environ.copy()
173 env.update(add_env)
175 # Python3 on windows always goes through list2cmdline() internally inside
176 # of subprocess.py so we must provide unicode strings here otherwise
177 # Python3 breaks when bytes are provided.
179 # Additionally, the preferred usage on Python3 is to pass unicode
180 # strings to subprocess. Python will automatically encode into the
181 # default encoding (utf-8) when it gets unicode strings.
182 shell = extra.get('shell', False)
183 cmd = prep_for_subprocess(cmd, shell=shell)
185 if WIN32 and cwd == getcwd():
186 # Windows cannot deal with passing a cwd that contains unicode
187 # but we luckily can pass None when the supplied cwd is the same
188 # as our current directory and get the same effect.
189 # Not doing this causes unicode encoding errors when launching
190 # the subprocess.
191 cwd = None
193 if PY2 and cwd:
194 cwd = encode(cwd)
196 if WIN32:
197 # If git-cola is invoked on Windows using "start pythonw git-cola",
198 # a console window will briefly flash on the screen each time
199 # git-cola invokes git, which is very annoying. The code below
200 # prevents this by ensuring that any window will be hidden.
201 startupinfo = subprocess.STARTUPINFO()
202 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
203 startupinfo.wShowWindow = subprocess.SW_HIDE
204 extra['startupinfo'] = startupinfo
206 if WIN32 and not no_win32_startupinfo:
207 CREATE_NO_WINDOW = 0x08000000
208 extra['creationflags'] = CREATE_NO_WINDOW
210 return subprocess.Popen(cmd, bufsize=1, stdin=stdin, stdout=stdout,
211 stderr=stderr, cwd=cwd, env=env,
212 universal_newlines=universal_newlines, **extra)
215 def prep_for_subprocess(cmd, shell=False):
216 """Decode on Python3, encode on Python2"""
217 # See the comment in start_command()
218 if shell:
219 if PY3:
220 cmd = decode(cmd)
221 else:
222 cmd = encode(cmd)
223 else:
224 if PY3:
225 cmd = [decode(c) for c in cmd]
226 else:
227 cmd = [encode(c) for c in cmd]
228 return cmd
231 @interruptable
232 def communicate(proc):
233 return proc.communicate()
236 def run_command(cmd, *args, **kwargs):
237 """Run the given command to completion, and return its results.
239 This provides a simpler interface to the subprocess module.
240 The results are formatted as a 3-tuple: (exit_code, output, errors)
241 The other arguments are passed on to start_command().
244 encoding = kwargs.pop('encoding', None)
245 process = start_command(cmd, *args, **kwargs)
246 (output, errors) = communicate(process)
247 output = decode(output, encoding=encoding)
248 errors = decode(errors, encoding=encoding)
249 exit_code = process.returncode
250 return (exit_code,
251 output or UStr('', ENCODING),
252 errors or UStr('', ENCODING))
255 @interruptable
256 def _fork_posix(args, cwd=None, shell=False):
257 """Launch a process in the background."""
258 encoded_args = [encode(arg) for arg in args]
259 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
262 def _fork_win32(args, cwd=None, shell=False):
263 """Launch a background process using crazy win32 voodoo."""
264 # This is probably wrong, but it works. Windows.. wow.
265 if args[0] == 'git-dag':
266 # win32 can't exec python scripts
267 args = [sys.executable] + args
268 args[0] = _win32_find_exe(args[0])
270 if PY3:
271 # see comment in start_command()
272 argv = [decode(arg) for arg in args]
273 else:
274 argv = [encode(arg) for arg in args]
276 DETACHED_PROCESS = 0x00000008 # Amazing!
277 return subprocess.Popen(argv, cwd=cwd, creationflags=DETACHED_PROCESS,
278 shell=shell).pid
281 def _win32_find_exe(exe):
282 """Find the actual file for a Windows executable.
284 This function goes through the same process that the Windows shell uses to
285 locate an executable, taking into account the PATH and PATHEXT environment
286 variables. This allows us to avoid passing shell=True to subprocess.Popen.
288 For reference, see:
289 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
292 # try the argument itself
293 candidates = [exe]
294 # if argument does not have an extension, also try it with each of the
295 # extensions specified in PATHEXT
296 if '.' not in exe:
297 extensions = getenv('PATHEXT', '').split(os.pathsep)
298 candidates.extend([(exe + ext) for ext in extensions
299 if ext.startswith('.')])
300 # search the current directory first
301 for candidate in candidates:
302 if exists(candidate):
303 return candidate
304 # if the argument does not include a path separator, search each of the
305 # directories on the PATH
306 if not os.path.dirname(exe):
307 for path in getenv('PATH').split(os.pathsep):
308 if path:
309 for candidate in candidates:
310 full_path = os.path.join(path, candidate)
311 if exists(full_path):
312 return full_path
313 # not found, punt and return the argument unchanged
314 return exe
317 # Portability wrappers
318 if sys.platform == 'win32' or sys.platform == 'cygwin':
319 fork = _fork_win32
320 else:
321 fork = _fork_posix
324 def _decorator_noop(x):
325 return x
328 def wrap(action, fn, decorator=None):
329 """Wrap arguments with `action`, optionally decorate the result"""
330 if decorator is None:
331 decorator = _decorator_noop
333 @functools.wraps(fn)
334 def wrapped(*args, **kwargs):
335 return decorator(fn(action(*args, **kwargs)))
337 return wrapped
340 def decorate(decorator, fn):
341 """Decorate the result of `fn` with `action`"""
342 @functools.wraps(fn)
343 def decorated(*args, **kwargs):
344 return decorator(fn(*args, **kwargs))
345 return decorated
348 def getenv(name, default=None):
349 return decode(os.getenv(name, default))
352 def xopen(path, mode='r', encoding=None):
353 return open(mkpath(path, encoding=encoding), mode)
356 def print_stdout(msg, linesep='\n'):
357 msg = msg + linesep
358 if PY2:
359 msg = encode(msg, encoding=ENCODING)
360 sys.stdout.write(msg)
363 def print_stderr(msg, linesep='\n'):
364 msg = msg + linesep
365 if PY2:
366 msg = encode(msg, encoding=ENCODING)
367 sys.stderr.write(msg)
370 def error(msg, status=EXIT_FAILURE, linesep='\n'):
371 print_stderr(msg, linesep=linesep)
372 sys.exit(status)
375 @interruptable
376 def node():
377 return platform.node()
380 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
381 chdir = wrap(mkpath, os.chdir)
382 exists = wrap(mkpath, os.path.exists)
383 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
384 if PY2:
385 if hasattr(os, 'getcwdu'):
386 # pylint: disable=no-member
387 getcwd = os.getcwdu
388 else:
389 getcwd = decorate(decode, os.getcwd)
390 else:
391 getcwd = os.getcwd
394 # NOTE: find_executable() is originally from the stdlib, but starting with
395 # python3.7 the stdlib no longer bundles distutils.
396 def _find_executable(executable, path=None):
397 """Tries to find 'executable' in the directories listed in 'path'.
399 A string listing directories separated by 'os.pathsep'; defaults to
400 os.environ['PATH']. Returns the complete filename or None if not found.
402 if path is None:
403 path = os.environ['PATH']
405 paths = path.split(os.pathsep)
406 _, ext = os.path.splitext(executable)
408 if (sys.platform == 'win32') and (ext != '.exe'):
409 executable = executable + '.exe'
411 if not os.path.isfile(executable):
412 for p in paths:
413 f = os.path.join(p, executable)
414 if os.path.isfile(f):
415 # the file exists, we have a shot at spawn working
416 return f
417 return None
419 return executable
422 if PY2:
423 find_executable = wrap(mkpath, _find_executable, decorator=decode)
424 else:
425 find_executable = wrap(decode, _find_executable, decorator=decode)
426 isdir = wrap(mkpath, os.path.isdir)
427 isfile = wrap(mkpath, os.path.isfile)
428 islink = wrap(mkpath, os.path.islink)
429 makedirs = wrap(mkpath, os.makedirs)
430 try:
431 readlink = wrap(mkpath, os.readlink, decorator=decode)
432 except AttributeError:
434 def _readlink_noop(p):
435 return p
437 readlink = _readlink_noop
439 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
440 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
441 stat = wrap(mkpath, os.stat)
442 unlink = wrap(mkpath, os.unlink)
443 walk = wrap(mkpath, os.walk)