CHANGES: mention the documentation improvements and typofixes
[git-cola.git] / cola / core.py
blob1e2a9c8f59b146e638fd2940849107bf3eb0e8dd
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 from __future__ import absolute_import, division, print_function, unicode_literals
8 import functools
9 import itertools
10 import mimetypes
11 import os
12 import platform
13 import subprocess
14 import sys
16 from .decorators import interruptable
17 from .compat import ustr
18 from .compat import PY2
19 from .compat import PY3
20 from .compat import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
25 EXIT_SUCCESS = 0
26 EXIT_FAILURE = 1
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
32 EXIT_USAGE = 64
33 EXIT_NOINPUT = 66
34 EXIT_UNAVAILABLE = 69
36 # Default encoding
37 ENCODING = 'utf-8'
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
41 _encoding_tests = [
42 ENCODING,
43 'iso-8859-15',
44 'windows1252',
45 'ascii',
46 # <-- add encodings here
50 class UStr(ustr):
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
58 byte sequences.
60 """
62 def __new__(cls, string, encoding):
64 if isinstance(string, UStr):
65 if encoding != string.encoding:
66 raise ValueError(
67 'Encoding conflict: %s vs. %s' % (string.encoding, encoding)
69 string = ustr(string)
71 obj = ustr.__new__(cls, string)
72 obj.encoding = encoding
73 return obj
76 def decode_maybe(value, encoding, errors='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value, 'decode'):
79 result = value.decode(encoding, errors=errors)
80 else:
81 result = value
82 return result
85 def decode(value, encoding=None, errors='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
87 if value is None:
88 result = None
89 elif isinstance(value, ustr):
90 result = UStr(value, ENCODING)
91 elif encoding == 'bytes':
92 result = value
93 else:
94 result = None
95 if encoding is None:
96 encoding_tests = _encoding_tests
97 else:
98 encoding_tests = itertools.chain([encoding], _encoding_tests)
100 for enc in encoding_tests:
101 try:
102 decoded = value.decode(enc, errors)
103 result = UStr(decoded, enc)
104 break
105 except ValueError:
106 pass
108 if result is None:
109 decoded = value.decode(ENCODING, errors='ignore')
110 result = UStr(decoded, ENCODING)
112 return result
115 def encode(string, encoding=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string, ustr):
118 return string
119 return string.encode(encoding or ENCODING, 'replace')
122 def mkpath(path, encoding=None):
123 # The Windows API requires unicode strings regardless of python version
124 if WIN32:
125 return decode(path, encoding=encoding)
126 # UNIX prefers bytes
127 return encode(path, encoding=encoding)
130 def list2cmdline(cmd):
131 return subprocess.list2cmdline([decode(c) for c in cmd])
134 def read(filename, size=-1, encoding=None, errors='strict'):
135 """Read filename and return contents"""
136 with xopen(filename, 'rb') as fh:
137 return xread(fh, size=size, encoding=encoding, errors=errors)
140 def write(path, contents, encoding=None):
141 """Writes a unicode string to a file"""
142 with xopen(path, 'wb') as fh:
143 return xwrite(fh, contents, encoding=encoding)
146 @interruptable
147 def xread(fh, size=-1, encoding=None, errors='strict'):
148 """Read from a filehandle and retry when interrupted"""
149 return decode(fh.read(size), encoding=encoding, errors=errors)
152 @interruptable
153 def xwrite(fh, content, encoding=None):
154 """Write to a filehandle and retry when interrupted"""
155 return fh.write(encode(content, encoding=encoding))
158 @interruptable
159 def wait(proc):
160 """Wait on a subprocess and retry when interrupted"""
161 return proc.wait()
164 @interruptable
165 def readline(fh, encoding=None):
166 return decode(fh.readline(), encoding=encoding)
169 @interruptable
170 def start_command(
171 cmd,
172 cwd=None,
173 add_env=None,
174 universal_newlines=False,
175 stdin=subprocess.PIPE,
176 stdout=subprocess.PIPE,
177 no_win32_startupinfo=False,
178 stderr=subprocess.PIPE,
179 **extra
181 """Start the given command, and return a subprocess object.
183 This provides a simpler interface to the subprocess module.
186 env = extra.pop('env', None)
187 if add_env is not None:
188 env = os.environ.copy()
189 env.update(add_env)
191 # Python3 on windows always goes through list2cmdline() internally inside
192 # of subprocess.py so we must provide unicode strings here otherwise
193 # Python3 breaks when bytes are provided.
195 # Additionally, the preferred usage on Python3 is to pass unicode
196 # strings to subprocess. Python will automatically encode into the
197 # default encoding (utf-8) when it gets unicode strings.
198 shell = extra.get('shell', False)
199 cmd = prep_for_subprocess(cmd, shell=shell)
201 if WIN32 and cwd == getcwd():
202 # Windows cannot deal with passing a cwd that contains unicode
203 # but we luckily can pass None when the supplied cwd is the same
204 # as our current directory and get the same effect.
205 # Not doing this causes unicode encoding errors when launching
206 # the subprocess.
207 cwd = None
209 if PY2 and cwd:
210 cwd = encode(cwd)
212 if WIN32:
213 # If git-cola is invoked on Windows using "start pythonw git-cola",
214 # a console window will briefly flash on the screen each time
215 # git-cola invokes git, which is very annoying. The code below
216 # prevents this by ensuring that any window will be hidden.
217 startupinfo = subprocess.STARTUPINFO()
218 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
219 startupinfo.wShowWindow = subprocess.SW_HIDE
220 extra['startupinfo'] = startupinfo
222 if WIN32 and not no_win32_startupinfo:
223 CREATE_NO_WINDOW = 0x08000000
224 extra['creationflags'] = CREATE_NO_WINDOW
226 # Use line buffering when in text/universal_newlines mode,
227 # otherwise use the system default buffer size.
228 bufsize = 1 if universal_newlines else -1
229 return subprocess.Popen(
230 cmd,
231 bufsize=bufsize,
232 stdin=stdin,
233 stdout=stdout,
234 stderr=stderr,
235 cwd=cwd,
236 env=env,
237 universal_newlines=universal_newlines,
238 **extra
242 def prep_for_subprocess(cmd, shell=False):
243 """Decode on Python3, encode on Python2"""
244 # See the comment in start_command()
245 if shell:
246 if PY3:
247 cmd = decode(cmd)
248 else:
249 cmd = encode(cmd)
250 else:
251 if PY3:
252 cmd = [decode(c) for c in cmd]
253 else:
254 cmd = [encode(c) for c in cmd]
255 return cmd
258 @interruptable
259 def communicate(proc):
260 return proc.communicate()
263 def run_command(cmd, *args, **kwargs):
264 """Run the given command to completion, and return its results.
266 This provides a simpler interface to the subprocess module.
267 The results are formatted as a 3-tuple: (exit_code, output, errors)
268 The other arguments are passed on to start_command().
271 encoding = kwargs.pop('encoding', None)
272 process = start_command(cmd, *args, **kwargs)
273 (output, errors) = communicate(process)
274 output = decode(output, encoding=encoding)
275 errors = decode(errors, encoding=encoding)
276 exit_code = process.returncode
277 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
280 @interruptable
281 def _fork_posix(args, cwd=None, shell=False):
282 """Launch a process in the background."""
283 encoded_args = [encode(arg) for arg in args]
284 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
287 def _fork_win32(args, cwd=None, shell=False):
288 """Launch a background process using crazy win32 voodoo."""
289 # This is probably wrong, but it works. Windows.. wow.
290 if args[0] == 'git-dag':
291 # win32 can't exec python scripts
292 args = [sys.executable] + args
294 if not shell:
295 args[0] = _win32_find_exe(args[0])
297 if PY3:
298 # see comment in start_command()
299 argv = [decode(arg) for arg in args]
300 else:
301 argv = [encode(arg) for arg in args]
303 DETACHED_PROCESS = 0x00000008 # Amazing!
304 return subprocess.Popen(
305 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
306 ).pid
309 def _win32_find_exe(exe):
310 """Find the actual file for a Windows executable.
312 This function goes through the same process that the Windows shell uses to
313 locate an executable, taking into account the PATH and PATHEXT environment
314 variables. This allows us to avoid passing shell=True to subprocess.Popen.
316 For reference, see:
317 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
320 # try the argument itself
321 candidates = [exe]
322 # if argument does not have an extension, also try it with each of the
323 # extensions specified in PATHEXT
324 if '.' not in exe:
325 extensions = getenv('PATHEXT', '').split(os.pathsep)
326 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
327 # search the current directory first
328 for candidate in candidates:
329 if exists(candidate):
330 return candidate
331 # if the argument does not include a path separator, search each of the
332 # directories on the PATH
333 if not os.path.dirname(exe):
334 for path in getenv('PATH').split(os.pathsep):
335 if path:
336 for candidate in candidates:
337 full_path = os.path.join(path, candidate)
338 if exists(full_path):
339 return full_path
340 # not found, punt and return the argument unchanged
341 return exe
344 # Portability wrappers
345 if sys.platform == 'win32' or sys.platform == 'cygwin':
346 fork = _fork_win32
347 else:
348 fork = _fork_posix
351 def _decorator_noop(x):
352 return x
355 def wrap(action, fn, decorator=None):
356 """Wrap arguments with `action`, optionally decorate the result"""
357 if decorator is None:
358 decorator = _decorator_noop
360 @functools.wraps(fn)
361 def wrapped(*args, **kwargs):
362 return decorator(fn(action(*args, **kwargs)))
364 return wrapped
367 def decorate(decorator, fn):
368 """Decorate the result of `fn` with `action`"""
370 @functools.wraps(fn)
371 def decorated(*args, **kwargs):
372 return decorator(fn(*args, **kwargs))
374 return decorated
377 def getenv(name, default=None):
378 return decode(os.getenv(name, default))
381 def guess_mimetype(filename):
382 """Robustly guess a filename's mimetype"""
383 mimetype = None
384 try:
385 mimetype = mimetypes.guess_type(filename)[0]
386 except UnicodeEncodeError:
387 mimetype = mimetypes.guess_type(encode(filename))[0]
388 except (TypeError, ValueError):
389 mimetype = mimetypes.guess_type(decode(filename))[0]
390 return mimetype
393 def xopen(path, mode='r', encoding=None):
394 return open(mkpath(path, encoding=encoding), mode)
397 def print_stdout(msg, linesep='\n'):
398 msg = msg + linesep
399 if PY2:
400 msg = encode(msg, encoding=ENCODING)
401 sys.stdout.write(msg)
404 def print_stderr(msg, linesep='\n'):
405 msg = msg + linesep
406 if PY2:
407 msg = encode(msg, encoding=ENCODING)
408 sys.stderr.write(msg)
411 def error(msg, status=EXIT_FAILURE, linesep='\n'):
412 print_stderr(msg, linesep=linesep)
413 sys.exit(status)
416 @interruptable
417 def node():
418 return platform.node()
421 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
422 chdir = wrap(mkpath, os.chdir)
423 exists = wrap(mkpath, os.path.exists)
424 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
425 if PY2:
426 if hasattr(os, 'getcwdu'):
427 # pylint: disable=no-member
428 getcwd = os.getcwdu
429 else:
430 getcwd = decorate(decode, os.getcwd)
431 else:
432 getcwd = os.getcwd
435 # NOTE: find_executable() is originally from the stdlib, but starting with
436 # python3.7 the stdlib no longer bundles distutils.
437 def _find_executable(executable, path=None):
438 """Tries to find 'executable' in the directories listed in 'path'.
440 A string listing directories separated by 'os.pathsep'; defaults to
441 os.environ['PATH']. Returns the complete filename or None if not found.
443 if path is None:
444 path = os.environ['PATH']
446 paths = path.split(os.pathsep)
447 _, ext = os.path.splitext(executable)
449 if (sys.platform == 'win32') and (ext != '.exe'):
450 executable = executable + '.exe'
452 if not os.path.isfile(executable):
453 for p in paths:
454 f = os.path.join(p, executable)
455 if os.path.isfile(f):
456 # the file exists, we have a shot at spawn working
457 return f
458 return None
460 return executable
463 if PY2:
464 find_executable = wrap(mkpath, _find_executable, decorator=decode)
465 else:
466 find_executable = wrap(decode, _find_executable, decorator=decode)
467 isdir = wrap(mkpath, os.path.isdir)
468 isfile = wrap(mkpath, os.path.isfile)
469 islink = wrap(mkpath, os.path.islink)
470 makedirs = wrap(mkpath, os.makedirs)
471 try:
472 readlink = wrap(mkpath, os.readlink, decorator=decode)
473 except AttributeError:
475 def _readlink_noop(p):
476 return p
478 readlink = _readlink_noop
480 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
481 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
482 stat = wrap(mkpath, os.stat)
483 unlink = wrap(mkpath, os.unlink)
484 walk = wrap(mkpath, os.walk)