CHANGES: update v4.4.2 release notes draft for #1368
[git-cola.git] / cola / core.py
blob28e2a382424655b3cdff5f3f98117a162cf10ade
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 import functools
8 import itertools
9 import mimetypes
10 import os
11 import platform
12 import subprocess
13 import sys
15 from .decorators import interruptable
16 from .compat import ustr
17 from .compat import PY2
18 from .compat import PY3
19 from .compat import WIN32
21 # /usr/include/stdlib.h
22 # #define EXIT_SUCCESS 0 /* Successful exit status. */
23 # #define EXIT_FAILURE 1 /* Failing exit status. */
24 EXIT_SUCCESS = 0
25 EXIT_FAILURE = 1
27 # /usr/include/sysexits.h
28 # #define EX_USAGE 64 /* command line usage error */
29 # #define EX_NOINPUT 66 /* cannot open input */
30 # #define EX_UNAVAILABLE 69 /* service unavailable */
31 EXIT_USAGE = 64
32 EXIT_NOINPUT = 66
33 EXIT_UNAVAILABLE = 69
35 # Default encoding
36 ENCODING = 'utf-8'
38 # Some files are not in UTF-8; some other aren't in any codification.
39 # Remember that GIT doesn't care about encodings (saves binary data)
40 _encoding_tests = [
41 ENCODING,
42 'iso-8859-15',
43 'windows1252',
44 'ascii',
45 # <-- add encodings here
49 class UStr(ustr):
50 """Unicode string wrapper that remembers its encoding
52 UStr wraps unicode strings to provide the `encoding` attribute.
53 UStr is used when decoding strings of an unknown encoding.
54 In order to generate patches that contain the original byte sequences,
55 we must preserve the original encoding when calling decode()
56 so that it can later be used when reconstructing the original
57 byte sequences.
59 """
61 def __new__(cls, string, encoding):
62 if isinstance(string, UStr):
63 if encoding != string.encoding:
64 raise ValueError(f'Encoding conflict: {string.encoding} vs. {encoding}')
65 string = ustr(string)
67 obj = ustr.__new__(cls, string)
68 obj.encoding = encoding
69 return obj
72 def decode_maybe(value, encoding, errors='strict'):
73 """Decode a value when the "decode" method exists"""
74 if hasattr(value, 'decode'):
75 result = value.decode(encoding, errors=errors)
76 else:
77 result = value
78 return result
81 def decode(value, encoding=None, errors='strict'):
82 """decode(encoded_string) returns an unencoded unicode string"""
83 if value is None:
84 result = None
85 elif isinstance(value, ustr):
86 result = UStr(value, ENCODING)
87 elif encoding == 'bytes':
88 result = value
89 else:
90 result = None
91 if encoding is None:
92 encoding_tests = _encoding_tests
93 else:
94 encoding_tests = itertools.chain([encoding], _encoding_tests)
96 for enc in encoding_tests:
97 try:
98 decoded = value.decode(enc, errors)
99 result = UStr(decoded, enc)
100 break
101 except ValueError:
102 pass
104 if result is None:
105 decoded = value.decode(ENCODING, errors='ignore')
106 result = UStr(decoded, ENCODING)
108 return result
111 def encode(string, encoding=None):
112 """encode(unencoded_string) returns a string encoded in utf-8"""
113 if not isinstance(string, ustr):
114 return string
115 return string.encode(encoding or ENCODING, 'replace')
118 def mkpath(path, encoding=None):
119 # The Windows API requires unicode strings regardless of python version
120 if WIN32:
121 return decode(path, encoding=encoding)
122 # UNIX prefers bytes
123 return encode(path, encoding=encoding)
126 def decode_seq(seq, encoding=None):
127 """Decode a sequence of values"""
128 return [decode(x, encoding=encoding) for x in seq]
131 def list2cmdline(cmd):
132 return subprocess.list2cmdline([decode(c) for c in cmd])
135 def read(filename, size=-1, encoding=None, errors='strict'):
136 """Read filename and return contents"""
137 with xopen(filename, 'rb') as fh:
138 return xread(fh, size=size, encoding=encoding, errors=errors)
141 def write(path, contents, encoding=None, append=False):
142 """Writes a unicode string to a file"""
143 if append:
144 mode = 'ab'
145 else:
146 mode = 'wb'
147 with xopen(path, mode) as fh:
148 return xwrite(fh, contents, encoding=encoding)
151 @interruptable
152 def xread(fh, size=-1, encoding=None, errors='strict'):
153 """Read from a filehandle and retry when interrupted"""
154 return decode(fh.read(size), encoding=encoding, errors=errors)
157 @interruptable
158 def xwrite(fh, content, encoding=None):
159 """Write to a filehandle and retry when interrupted"""
160 return fh.write(encode(content, encoding=encoding))
163 @interruptable
164 def wait(proc):
165 """Wait on a subprocess and retry when interrupted"""
166 return proc.wait()
169 @interruptable
170 def readline(fh, encoding=None):
171 return decode(fh.readline(), encoding=encoding)
174 @interruptable
175 def start_command(
176 cmd,
177 cwd=None,
178 add_env=None,
179 universal_newlines=False,
180 stdin=subprocess.PIPE,
181 stdout=subprocess.PIPE,
182 no_win32_startupinfo=False,
183 stderr=subprocess.PIPE,
184 **extra,
186 """Start the given command, and return a subprocess object.
188 This provides a simpler interface to the subprocess module.
191 env = extra.pop('env', None)
192 if add_env is not None:
193 env = os.environ.copy()
194 env.update(add_env)
196 # Python3 on windows always goes through list2cmdline() internally inside
197 # of subprocess.py so we must provide unicode strings here otherwise
198 # Python3 breaks when bytes are provided.
200 # Additionally, the preferred usage on Python3 is to pass unicode
201 # strings to subprocess. Python will automatically encode into the
202 # default encoding (utf-8) when it gets unicode strings.
203 shell = extra.get('shell', False)
204 cmd = prep_for_subprocess(cmd, shell=shell)
206 if WIN32 and cwd == getcwd():
207 # Windows cannot deal with passing a cwd that contains unicode
208 # but we luckily can pass None when the supplied cwd is the same
209 # as our current directory and get the same effect.
210 # Not doing this causes unicode encoding errors when launching
211 # the subprocess.
212 cwd = None
214 if PY2 and cwd:
215 cwd = encode(cwd)
217 if WIN32:
218 # If git-cola is invoked on Windows using "start pythonw git-cola",
219 # a console window will briefly flash on the screen each time
220 # git-cola invokes git, which is very annoying. The code below
221 # prevents this by ensuring that any window will be hidden.
222 startupinfo = subprocess.STARTUPINFO()
223 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
224 extra['startupinfo'] = startupinfo
226 if WIN32 and not no_win32_startupinfo:
227 CREATE_NO_WINDOW = 0x08000000
228 extra['creationflags'] = CREATE_NO_WINDOW
230 # Use line buffering when in text/universal_newlines mode,
231 # otherwise use the system default buffer size.
232 bufsize = 1 if universal_newlines else -1
233 return subprocess.Popen(
234 cmd,
235 bufsize=bufsize,
236 stdin=stdin,
237 stdout=stdout,
238 stderr=stderr,
239 cwd=cwd,
240 env=env,
241 universal_newlines=universal_newlines,
242 **extra,
246 def prep_for_subprocess(cmd, shell=False):
247 """Decode on Python3, encode on Python2"""
248 # See the comment in start_command()
249 if shell:
250 if PY3:
251 cmd = decode(cmd)
252 else:
253 cmd = encode(cmd)
254 else:
255 if PY3:
256 cmd = [decode(c) for c in cmd]
257 else:
258 cmd = [encode(c) for c in cmd]
259 return cmd
262 @interruptable
263 def communicate(proc):
264 return proc.communicate()
267 def run_command(cmd, *args, **kwargs):
268 """Run the given command to completion, and return its results.
270 This provides a simpler interface to the subprocess module.
271 The results are formatted as a 3-tuple: (exit_code, output, errors)
272 The other arguments are passed on to start_command().
275 encoding = kwargs.pop('encoding', None)
276 process = start_command(cmd, *args, **kwargs)
277 (output, errors) = communicate(process)
278 output = decode(output, encoding=encoding)
279 errors = decode(errors, encoding=encoding)
280 exit_code = process.returncode
281 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
284 @interruptable
285 def _fork_posix(args, cwd=None, shell=False):
286 """Launch a process in the background."""
287 encoded_args = [encode(arg) for arg in args]
288 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
291 def _fork_win32(args, cwd=None, shell=False):
292 """Launch a background process using crazy win32 voodoo."""
293 # This is probably wrong, but it works. Windows.. wow.
294 if args[0] == 'git-dag':
295 # win32 can't exec python scripts
296 args = [sys.executable] + args
298 if not shell:
299 args[0] = _win32_find_exe(args[0])
301 if PY3:
302 # see comment in start_command()
303 argv = [decode(arg) for arg in args]
304 else:
305 argv = [encode(arg) for arg in args]
307 DETACHED_PROCESS = 0x00000008 # Amazing!
308 return subprocess.Popen(
309 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
310 ).pid
313 def _win32_find_exe(exe):
314 """Find the actual file for a Windows executable.
316 This function goes through the same process that the Windows shell uses to
317 locate an executable, taking into account the PATH and PATHEXT environment
318 variables. This allows us to avoid passing shell=True to subprocess.Popen.
320 For reference, see:
321 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
324 # try the argument itself
325 candidates = [exe]
326 # if argument does not have an extension, also try it with each of the
327 # extensions specified in PATHEXT
328 if '.' not in exe:
329 extensions = getenv('PATHEXT', '').split(os.pathsep)
330 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
331 # search the current directory first
332 for candidate in candidates:
333 if exists(candidate):
334 return candidate
335 # if the argument does not include a path separator, search each of the
336 # directories on the PATH
337 if not os.path.dirname(exe):
338 for path in getenv('PATH').split(os.pathsep):
339 if path:
340 for candidate in candidates:
341 full_path = os.path.join(path, candidate)
342 if exists(full_path):
343 return full_path
344 # not found, punt and return the argument unchanged
345 return exe
348 # Portability wrappers
349 if sys.platform in {'win32', 'cygwin'}:
350 fork = _fork_win32
351 else:
352 fork = _fork_posix
355 def _decorator_noop(x):
356 return x
359 def wrap(action, func, decorator=None):
360 """Wrap arguments with `action`, optionally decorate the result"""
361 if decorator is None:
362 decorator = _decorator_noop
364 @functools.wraps(func)
365 def wrapped(*args, **kwargs):
366 return decorator(func(action(*args, **kwargs)))
368 return wrapped
371 def decorate(decorator, func):
372 """Decorate the result of `func` with `action`"""
374 @functools.wraps(func)
375 def decorated(*args, **kwargs):
376 return decorator(func(*args, **kwargs))
378 return decorated
381 def getenv(name, default=None):
382 return decode(os.getenv(name, default))
385 def guess_mimetype(filename):
386 """Robustly guess a filename's mimetype"""
387 mimetype = None
388 try:
389 mimetype = mimetypes.guess_type(filename)[0]
390 except UnicodeEncodeError:
391 mimetype = mimetypes.guess_type(encode(filename))[0]
392 except (TypeError, ValueError):
393 mimetype = mimetypes.guess_type(decode(filename))[0]
394 return mimetype
397 def xopen(path, mode='r', encoding=None):
398 """Open a file with the specified mode and encoding
400 The path is decoded into unicode on Windows and encoded into bytes on Unix.
402 # pylint: disable=unspecified-encoding
403 return open(mkpath(path, encoding=encoding), mode)
406 def open_append(path, encoding=None):
407 """Open a file for appending in utf-8 text mode"""
408 return open(mkpath(path, encoding=encoding), 'a', encoding='utf-8')
411 def open_read(path, encoding=None):
412 """Open a file for reading in utf-8 text mode"""
413 return open(mkpath(path, encoding=encoding), encoding='utf-8')
416 def open_write(path, encoding=None):
417 """Open a file for writing in utf-8 text mode"""
418 return open(mkpath(path, encoding=encoding), 'w', encoding='utf-8')
421 def print_stdout(msg, linesep='\n'):
422 msg = msg + linesep
423 if PY2:
424 msg = encode(msg, encoding=ENCODING)
425 sys.stdout.write(msg)
428 def print_stderr(msg, linesep='\n'):
429 msg = msg + linesep
430 if PY2:
431 msg = encode(msg, encoding=ENCODING)
432 sys.stderr.write(msg)
435 def error(msg, status=EXIT_FAILURE, linesep='\n'):
436 print_stderr(msg, linesep=linesep)
437 sys.exit(status)
440 @interruptable
441 def node():
442 return platform.node()
445 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
446 chdir = wrap(mkpath, os.chdir)
447 exists = wrap(mkpath, os.path.exists)
448 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
449 if PY2:
450 if hasattr(os, 'getcwdu'):
451 # pylint: disable=no-member
452 getcwd = os.getcwdu
453 else:
454 getcwd = decorate(decode, os.getcwd)
455 else:
456 getcwd = os.getcwd
459 # NOTE: find_executable() is originally from the stdlib, but starting with
460 # python3.7 the stdlib no longer bundles distutils.
461 def _find_executable(executable, path=None):
462 """Tries to find 'executable' in the directories listed in 'path'.
464 A string listing directories separated by 'os.pathsep'; defaults to
465 os.environ['PATH']. Returns the complete filename or None if not found.
467 if path is None:
468 path = os.environ['PATH']
470 paths = path.split(os.pathsep)
471 _, ext = os.path.splitext(executable)
473 if (sys.platform == 'win32') and (ext != '.exe'):
474 executable = executable + '.exe'
476 if not os.path.isfile(executable):
477 for dirname in paths:
478 filename = os.path.join(dirname, executable)
479 if os.path.isfile(filename):
480 # the file exists, we have a shot at spawn working
481 return filename
482 return None
484 return executable
487 def sync():
488 """Force writing of everything to disk. No-op on systems without os.sync()"""
489 if hasattr(os, 'sync'):
490 os.sync()
493 def rename(old, new):
494 """Rename a path. Transform arguments to handle non-ascii file paths"""
495 os.rename(mkpath(old), mkpath(new))
498 if PY2:
499 find_executable = wrap(mkpath, _find_executable, decorator=decode)
500 else:
501 find_executable = wrap(decode, _find_executable, decorator=decode)
502 isdir = wrap(mkpath, os.path.isdir)
503 isfile = wrap(mkpath, os.path.isfile)
504 islink = wrap(mkpath, os.path.islink)
505 listdir = wrap(mkpath, os.listdir, decorator=decode_seq)
506 makedirs = wrap(mkpath, os.makedirs)
507 try:
508 readlink = wrap(mkpath, os.readlink, decorator=decode)
509 except AttributeError:
511 def _readlink_noop(p):
512 return p
514 readlink = _readlink_noop
516 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
517 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
518 remove = wrap(mkpath, os.remove)
519 stat = wrap(mkpath, os.stat)
520 unlink = wrap(mkpath, os.unlink)
521 if PY2:
522 walk = wrap(mkpath, os.walk)
523 else:
524 walk = os.walk