settings: write settings to disk robustly
[git-cola.git] / cola / core.py
blobd7d961005dd528f5e3c89a03c7cd341e7d6187c5
1 """This module provides core functions for handling unicode and UNIX quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
6 """
7 from __future__ import absolute_import, division, print_function, unicode_literals
8 import functools
9 import itertools
10 import mimetypes
11 import os
12 import platform
13 import subprocess
14 import sys
16 from .decorators import interruptable
17 from .compat import ustr
18 from .compat import PY2
19 from .compat import PY3
20 from .compat import WIN32
22 # /usr/include/stdlib.h
23 # #define EXIT_SUCCESS 0 /* Successful exit status. */
24 # #define EXIT_FAILURE 1 /* Failing exit status. */
25 EXIT_SUCCESS = 0
26 EXIT_FAILURE = 1
28 # /usr/include/sysexits.h
29 # #define EX_USAGE 64 /* command line usage error */
30 # #define EX_NOINPUT 66 /* cannot open input */
31 # #define EX_UNAVAILABLE 69 /* service unavailable */
32 EXIT_USAGE = 64
33 EXIT_NOINPUT = 66
34 EXIT_UNAVAILABLE = 69
36 # Default encoding
37 ENCODING = 'utf-8'
39 # Some files are not in UTF-8; some other aren't in any codification.
40 # Remember that GIT doesn't care about encodings (saves binary data)
41 _encoding_tests = [
42 ENCODING,
43 'iso-8859-15',
44 'windows1252',
45 'ascii',
46 # <-- add encodings here
50 class UStr(ustr):
51 """Unicode string wrapper that remembers its encoding
53 UStr wraps unicode strings to provide the `encoding` attribute.
54 UStr is used when decoding strings of an unknown encoding.
55 In order to generate patches that contain the original byte sequences,
56 we must preserve the original encoding when calling decode()
57 so that it can later be used when reconstructing the original
58 byte sequences.
60 """
62 def __new__(cls, string, encoding):
64 if isinstance(string, UStr):
65 if encoding != string.encoding:
66 raise ValueError(
67 'Encoding conflict: %s vs. %s' % (string.encoding, encoding)
69 string = ustr(string)
71 obj = ustr.__new__(cls, string)
72 obj.encoding = encoding
73 return obj
76 def decode_maybe(value, encoding, errors='strict'):
77 """Decode a value when the "decode" method exists"""
78 if hasattr(value, 'decode'):
79 result = value.decode(encoding, errors=errors)
80 else:
81 result = value
82 return result
85 def decode(value, encoding=None, errors='strict'):
86 """decode(encoded_string) returns an unencoded unicode string"""
87 if value is None:
88 result = None
89 elif isinstance(value, ustr):
90 result = UStr(value, ENCODING)
91 elif encoding == 'bytes':
92 result = value
93 else:
94 result = None
95 if encoding is None:
96 encoding_tests = _encoding_tests
97 else:
98 encoding_tests = itertools.chain([encoding], _encoding_tests)
100 for enc in encoding_tests:
101 try:
102 decoded = value.decode(enc, errors)
103 result = UStr(decoded, enc)
104 break
105 except ValueError:
106 pass
108 if result is None:
109 decoded = value.decode(ENCODING, errors='ignore')
110 result = UStr(decoded, ENCODING)
112 return result
115 def encode(string, encoding=None):
116 """encode(unencoded_string) returns a string encoded in utf-8"""
117 if not isinstance(string, ustr):
118 return string
119 return string.encode(encoding or ENCODING, 'replace')
122 def mkpath(path, encoding=None):
123 # The Windows API requires unicode strings regardless of python version
124 if WIN32:
125 return decode(path, encoding=encoding)
126 # UNIX prefers bytes
127 return encode(path, encoding=encoding)
130 def decode_seq(seq, encoding=None):
131 """Decode a sequence of values"""
132 return [decode(x, encoding=encoding) for x in seq]
135 def list2cmdline(cmd):
136 return subprocess.list2cmdline([decode(c) for c in cmd])
139 def read(filename, size=-1, encoding=None, errors='strict'):
140 """Read filename and return contents"""
141 with xopen(filename, 'rb') as fh:
142 return xread(fh, size=size, encoding=encoding, errors=errors)
145 def write(path, contents, encoding=None):
146 """Writes a unicode string to a file"""
147 with xopen(path, 'wb') as fh:
148 return xwrite(fh, contents, encoding=encoding)
151 @interruptable
152 def xread(fh, size=-1, encoding=None, errors='strict'):
153 """Read from a filehandle and retry when interrupted"""
154 return decode(fh.read(size), encoding=encoding, errors=errors)
157 @interruptable
158 def xwrite(fh, content, encoding=None):
159 """Write to a filehandle and retry when interrupted"""
160 return fh.write(encode(content, encoding=encoding))
163 @interruptable
164 def wait(proc):
165 """Wait on a subprocess and retry when interrupted"""
166 return proc.wait()
169 @interruptable
170 def readline(fh, encoding=None):
171 return decode(fh.readline(), encoding=encoding)
174 @interruptable
175 def start_command(
176 cmd,
177 cwd=None,
178 add_env=None,
179 universal_newlines=False,
180 stdin=subprocess.PIPE,
181 stdout=subprocess.PIPE,
182 no_win32_startupinfo=False,
183 stderr=subprocess.PIPE,
184 **extra
186 """Start the given command, and return a subprocess object.
188 This provides a simpler interface to the subprocess module.
191 env = extra.pop('env', None)
192 if add_env is not None:
193 env = os.environ.copy()
194 env.update(add_env)
196 # Python3 on windows always goes through list2cmdline() internally inside
197 # of subprocess.py so we must provide unicode strings here otherwise
198 # Python3 breaks when bytes are provided.
200 # Additionally, the preferred usage on Python3 is to pass unicode
201 # strings to subprocess. Python will automatically encode into the
202 # default encoding (utf-8) when it gets unicode strings.
203 shell = extra.get('shell', False)
204 cmd = prep_for_subprocess(cmd, shell=shell)
206 if WIN32 and cwd == getcwd():
207 # Windows cannot deal with passing a cwd that contains unicode
208 # but we luckily can pass None when the supplied cwd is the same
209 # as our current directory and get the same effect.
210 # Not doing this causes unicode encoding errors when launching
211 # the subprocess.
212 cwd = None
214 if PY2 and cwd:
215 cwd = encode(cwd)
217 if WIN32:
218 # If git-cola is invoked on Windows using "start pythonw git-cola",
219 # a console window will briefly flash on the screen each time
220 # git-cola invokes git, which is very annoying. The code below
221 # prevents this by ensuring that any window will be hidden.
222 startupinfo = subprocess.STARTUPINFO()
223 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW
224 startupinfo.wShowWindow = subprocess.SW_HIDE
225 extra['startupinfo'] = startupinfo
227 if WIN32 and not no_win32_startupinfo:
228 CREATE_NO_WINDOW = 0x08000000
229 extra['creationflags'] = CREATE_NO_WINDOW
231 # Use line buffering when in text/universal_newlines mode,
232 # otherwise use the system default buffer size.
233 bufsize = 1 if universal_newlines else -1
234 return subprocess.Popen(
235 cmd,
236 bufsize=bufsize,
237 stdin=stdin,
238 stdout=stdout,
239 stderr=stderr,
240 cwd=cwd,
241 env=env,
242 universal_newlines=universal_newlines,
243 **extra
247 def prep_for_subprocess(cmd, shell=False):
248 """Decode on Python3, encode on Python2"""
249 # See the comment in start_command()
250 if shell:
251 if PY3:
252 cmd = decode(cmd)
253 else:
254 cmd = encode(cmd)
255 else:
256 if PY3:
257 cmd = [decode(c) for c in cmd]
258 else:
259 cmd = [encode(c) for c in cmd]
260 return cmd
263 @interruptable
264 def communicate(proc):
265 return proc.communicate()
268 def run_command(cmd, *args, **kwargs):
269 """Run the given command to completion, and return its results.
271 This provides a simpler interface to the subprocess module.
272 The results are formatted as a 3-tuple: (exit_code, output, errors)
273 The other arguments are passed on to start_command().
276 encoding = kwargs.pop('encoding', None)
277 process = start_command(cmd, *args, **kwargs)
278 (output, errors) = communicate(process)
279 output = decode(output, encoding=encoding)
280 errors = decode(errors, encoding=encoding)
281 exit_code = process.returncode
282 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
285 @interruptable
286 def _fork_posix(args, cwd=None, shell=False):
287 """Launch a process in the background."""
288 encoded_args = [encode(arg) for arg in args]
289 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
292 def _fork_win32(args, cwd=None, shell=False):
293 """Launch a background process using crazy win32 voodoo."""
294 # This is probably wrong, but it works. Windows.. wow.
295 if args[0] == 'git-dag':
296 # win32 can't exec python scripts
297 args = [sys.executable] + args
299 if not shell:
300 args[0] = _win32_find_exe(args[0])
302 if PY3:
303 # see comment in start_command()
304 argv = [decode(arg) for arg in args]
305 else:
306 argv = [encode(arg) for arg in args]
308 DETACHED_PROCESS = 0x00000008 # Amazing!
309 return subprocess.Popen(
310 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
311 ).pid
314 def _win32_find_exe(exe):
315 """Find the actual file for a Windows executable.
317 This function goes through the same process that the Windows shell uses to
318 locate an executable, taking into account the PATH and PATHEXT environment
319 variables. This allows us to avoid passing shell=True to subprocess.Popen.
321 For reference, see:
322 http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
325 # try the argument itself
326 candidates = [exe]
327 # if argument does not have an extension, also try it with each of the
328 # extensions specified in PATHEXT
329 if '.' not in exe:
330 extensions = getenv('PATHEXT', '').split(os.pathsep)
331 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
332 # search the current directory first
333 for candidate in candidates:
334 if exists(candidate):
335 return candidate
336 # if the argument does not include a path separator, search each of the
337 # directories on the PATH
338 if not os.path.dirname(exe):
339 for path in getenv('PATH').split(os.pathsep):
340 if path:
341 for candidate in candidates:
342 full_path = os.path.join(path, candidate)
343 if exists(full_path):
344 return full_path
345 # not found, punt and return the argument unchanged
346 return exe
349 # Portability wrappers
350 if sys.platform == 'win32' or sys.platform == 'cygwin':
351 fork = _fork_win32
352 else:
353 fork = _fork_posix
356 def _decorator_noop(x):
357 return x
360 def wrap(action, fn, decorator=None):
361 """Wrap arguments with `action`, optionally decorate the result"""
362 if decorator is None:
363 decorator = _decorator_noop
365 @functools.wraps(fn)
366 def wrapped(*args, **kwargs):
367 return decorator(fn(action(*args, **kwargs)))
369 return wrapped
372 def decorate(decorator, fn):
373 """Decorate the result of `fn` with `action`"""
375 @functools.wraps(fn)
376 def decorated(*args, **kwargs):
377 return decorator(fn(*args, **kwargs))
379 return decorated
382 def getenv(name, default=None):
383 return decode(os.getenv(name, default))
386 def guess_mimetype(filename):
387 """Robustly guess a filename's mimetype"""
388 mimetype = None
389 try:
390 mimetype = mimetypes.guess_type(filename)[0]
391 except UnicodeEncodeError:
392 mimetype = mimetypes.guess_type(encode(filename))[0]
393 except (TypeError, ValueError):
394 mimetype = mimetypes.guess_type(decode(filename))[0]
395 return mimetype
398 def xopen(path, mode='r', encoding=None):
399 return open(mkpath(path, encoding=encoding), mode)
402 def print_stdout(msg, linesep='\n'):
403 msg = msg + linesep
404 if PY2:
405 msg = encode(msg, encoding=ENCODING)
406 sys.stdout.write(msg)
409 def print_stderr(msg, linesep='\n'):
410 msg = msg + linesep
411 if PY2:
412 msg = encode(msg, encoding=ENCODING)
413 sys.stderr.write(msg)
416 def error(msg, status=EXIT_FAILURE, linesep='\n'):
417 print_stderr(msg, linesep=linesep)
418 sys.exit(status)
421 @interruptable
422 def node():
423 return platform.node()
426 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
427 chdir = wrap(mkpath, os.chdir)
428 exists = wrap(mkpath, os.path.exists)
429 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
430 if PY2:
431 if hasattr(os, 'getcwdu'):
432 # pylint: disable=no-member
433 getcwd = os.getcwdu
434 else:
435 getcwd = decorate(decode, os.getcwd)
436 else:
437 getcwd = os.getcwd
440 # NOTE: find_executable() is originally from the stdlib, but starting with
441 # python3.7 the stdlib no longer bundles distutils.
442 def _find_executable(executable, path=None):
443 """Tries to find 'executable' in the directories listed in 'path'.
445 A string listing directories separated by 'os.pathsep'; defaults to
446 os.environ['PATH']. Returns the complete filename or None if not found.
448 if path is None:
449 path = os.environ['PATH']
451 paths = path.split(os.pathsep)
452 _, ext = os.path.splitext(executable)
454 if (sys.platform == 'win32') and (ext != '.exe'):
455 executable = executable + '.exe'
457 if not os.path.isfile(executable):
458 for p in paths:
459 f = os.path.join(p, executable)
460 if os.path.isfile(f):
461 # the file exists, we have a shot at spawn working
462 return f
463 return None
465 return executable
468 def sync():
469 """Force writing of everything to disk. No-op on systems without os.sync()"""
470 if hasattr(os, 'sync'):
471 os.sync()
474 def rename(old, new):
475 """Rename a path. Transform arguments to handle non-ascii file paths"""
476 os.rename(mkpath(old), mkpath(new))
479 if PY2:
480 find_executable = wrap(mkpath, _find_executable, decorator=decode)
481 else:
482 find_executable = wrap(decode, _find_executable, decorator=decode)
483 isdir = wrap(mkpath, os.path.isdir)
484 isfile = wrap(mkpath, os.path.isfile)
485 islink = wrap(mkpath, os.path.islink)
486 listdir = wrap(mkpath, os.listdir, decorator=decode_seq)
487 makedirs = wrap(mkpath, os.makedirs)
488 try:
489 readlink = wrap(mkpath, os.readlink, decorator=decode)
490 except AttributeError:
492 def _readlink_noop(p):
493 return p
495 readlink = _readlink_noop
497 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
498 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
499 remove = wrap(mkpath, os.remove)
500 stat = wrap(mkpath, os.stat)
501 unlink = wrap(mkpath, os.unlink)
502 walk = wrap(mkpath, os.walk)