app: flesh out NullArgs for recent changes
[git-cola.git] / cola / core.py
blobe907f0802624211e7d3122b66f35e85fe9bff890
1 """This module provides core functions for handling Unicode and Unix quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
5 """
6 import functools
7 import itertools
8 import mimetypes
9 import os
10 import platform
11 import subprocess
12 import sys
14 from .decorators import interruptable
15 from .compat import ustr
16 from .compat import PY2
17 from .compat import PY3
18 from .compat import WIN32
20 # /usr/include/stdlib.h
21 # #define EXIT_SUCCESS 0 /* Successful exit status. */
22 # #define EXIT_FAILURE 1 /* Failing exit status. */
23 EXIT_SUCCESS = 0
24 EXIT_FAILURE = 1
26 # /usr/include/sysexits.h
27 # #define EX_USAGE 64 /* command line usage error */
28 # #define EX_NOINPUT 66 /* cannot open input */
29 # #define EX_UNAVAILABLE 69 /* service unavailable */
30 EXIT_USAGE = 64
31 EXIT_NOINPUT = 66
32 EXIT_UNAVAILABLE = 69
34 # Default encoding
35 ENCODING = 'utf-8'
37 # Some files are not in UTF-8; some other aren't in any codification.
38 # Remember that GIT doesn't care about encodings (saves binary data)
39 _encoding_tests = [
40 ENCODING,
41 'iso-8859-15',
42 'windows1252',
43 'ascii',
44 # <-- add encodings here
48 class UStr(ustr):
49 """Unicode string wrapper that remembers its encoding
51 UStr wraps Unicode strings to provide the `encoding` attribute.
52 UStr is used when decoding strings of an unknown encoding.
53 In order to generate patches that contain the original byte sequences,
54 we must preserve the original encoding when calling decode()
55 so that it can later be used when reconstructing the original
56 byte sequences.
58 """
60 def __new__(cls, string, encoding):
61 if isinstance(string, UStr):
62 if encoding != string.encoding:
63 raise ValueError(f'Encoding conflict: {string.encoding} vs. {encoding}')
64 string = ustr(string)
66 obj = ustr.__new__(cls, string)
67 obj.encoding = encoding
68 return obj
71 def decode_maybe(value, encoding, errors='strict'):
72 """Decode a value when the "decode" method exists"""
73 if hasattr(value, 'decode'):
74 result = value.decode(encoding, errors=errors)
75 else:
76 result = value
77 return result
80 def decode(value, encoding=None, errors='strict'):
81 """decode(encoded_string) returns an un-encoded Unicode string"""
82 if value is None:
83 result = None
84 elif isinstance(value, ustr):
85 result = UStr(value, ENCODING)
86 elif encoding == 'bytes':
87 result = value
88 else:
89 result = None
90 if encoding is None:
91 encoding_tests = _encoding_tests
92 else:
93 encoding_tests = itertools.chain([encoding], _encoding_tests)
95 for enc in encoding_tests:
96 try:
97 decoded = value.decode(enc, errors)
98 result = UStr(decoded, enc)
99 break
100 except ValueError:
101 pass
103 if result is None:
104 decoded = value.decode(ENCODING, errors='ignore')
105 result = UStr(decoded, ENCODING)
107 return result
110 def encode(string, encoding=None):
111 """encode(string) returns a byte string encoded to UTF-8"""
112 if not isinstance(string, ustr):
113 return string
114 return string.encode(encoding or ENCODING, 'replace')
117 def mkpath(path, encoding=None):
118 # The Windows API requires Unicode strings regardless of python version
119 if WIN32:
120 return decode(path, encoding=encoding)
121 # UNIX prefers bytes
122 return encode(path, encoding=encoding)
125 def decode_seq(seq, encoding=None):
126 """Decode a sequence of values"""
127 return [decode(x, encoding=encoding) for x in seq]
130 def list2cmdline(cmd):
131 return subprocess.list2cmdline([decode(c) for c in cmd])
134 def read(filename, size=-1, encoding=None, errors='strict'):
135 """Read filename and return contents"""
136 with xopen(filename, 'rb') as fh:
137 return xread(fh, size=size, encoding=encoding, errors=errors)
140 def write(path, contents, encoding=None, append=False):
141 """Writes a Unicode string to a file"""
142 if append:
143 mode = 'ab'
144 else:
145 mode = 'wb'
146 with xopen(path, mode) as fh:
147 return xwrite(fh, contents, encoding=encoding)
150 @interruptable
151 def xread(fh, size=-1, encoding=None, errors='strict'):
152 """Read from a file handle and retry when interrupted"""
153 return decode(fh.read(size), encoding=encoding, errors=errors)
156 @interruptable
157 def xwrite(fh, content, encoding=None):
158 """Write to a file handle and retry when interrupted"""
159 return fh.write(encode(content, encoding=encoding))
162 @interruptable
163 def wait(proc):
164 """Wait on a subprocess and retry when interrupted"""
165 return proc.wait()
168 @interruptable
169 def readline(fh, encoding=None):
170 return decode(fh.readline(), encoding=encoding)
173 @interruptable
174 def start_command(
175 cmd,
176 cwd=None,
177 add_env=None,
178 universal_newlines=False,
179 stdin=subprocess.PIPE,
180 stdout=subprocess.PIPE,
181 no_win32_startupinfo=False,
182 stderr=subprocess.PIPE,
183 **extra,
185 """Start the given command, and return a subprocess object.
187 This provides a simpler interface to the subprocess module.
190 env = extra.pop('env', None)
191 if add_env is not None:
192 env = os.environ.copy()
193 env.update(add_env)
195 # Python3 on windows always goes through list2cmdline() internally inside
196 # of subprocess.py so we must provide Unicode strings here otherwise
197 # Python3 breaks when bytes are provided.
199 # Additionally, the preferred usage on Python3 is to pass Unicode
200 # strings to subprocess. Python will automatically encode into the
201 # default encoding (UTF-8) when it gets Unicode strings.
202 shell = extra.get('shell', False)
203 cmd = prep_for_subprocess(cmd, shell=shell)
205 if WIN32 and cwd == getcwd():
206 # Windows cannot deal with passing a cwd that contains Unicode
207 # but we luckily can pass None when the supplied cwd is the same
208 # as our current directory and get the same effect.
209 # Not doing this causes Unicode encoding errors when launching
210 # the subprocess.
211 cwd = None
213 if PY2 and cwd:
214 cwd = encode(cwd)
216 if WIN32:
217 # If git-cola is invoked on Windows using "start pythonw git-cola",
218 # a console window will briefly flash on the screen each time
219 # git-cola invokes git, which is very annoying. The code below
220 # prevents this by ensuring that any window will be hidden.
221 startupinfo = subprocess.STARTUPINFO()
222 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
223 extra['startupinfo'] = startupinfo
225 if WIN32 and not no_win32_startupinfo:
226 CREATE_NO_WINDOW = 0x08000000
227 extra['creationflags'] = CREATE_NO_WINDOW
229 # Use line buffering when in text/universal_newlines mode,
230 # otherwise use the system default buffer size.
231 bufsize = 1 if universal_newlines else -1
232 return subprocess.Popen(
233 cmd,
234 bufsize=bufsize,
235 stdin=stdin,
236 stdout=stdout,
237 stderr=stderr,
238 cwd=cwd,
239 env=env,
240 universal_newlines=universal_newlines,
241 **extra,
245 def prep_for_subprocess(cmd, shell=False):
246 """Decode on Python3, encode on Python2"""
247 # See the comment in start_command()
248 if shell:
249 if PY3:
250 cmd = decode(cmd)
251 else:
252 cmd = encode(cmd)
253 else:
254 if PY3:
255 cmd = [decode(c) for c in cmd]
256 else:
257 cmd = [encode(c) for c in cmd]
258 return cmd
261 @interruptable
262 def communicate(proc):
263 return proc.communicate()
266 def run_command(cmd, *args, **kwargs):
267 """Run the given command to completion, and return its results.
269 This provides a simpler interface to the subprocess module.
270 The results are formatted as a 3-tuple: (exit_code, output, errors)
271 The other arguments are passed on to start_command().
274 encoding = kwargs.pop('encoding', None)
275 process = start_command(cmd, *args, **kwargs)
276 (output, errors) = communicate(process)
277 output = decode(output, encoding=encoding)
278 errors = decode(errors, encoding=encoding)
279 exit_code = process.returncode
280 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
283 @interruptable
284 def _fork_posix(args, cwd=None, shell=False):
285 """Launch a process in the background."""
286 encoded_args = [encode(arg) for arg in args]
287 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
290 def _fork_win32(args, cwd=None, shell=False):
291 """Launch a background process using crazy win32 voodoo."""
292 # This is probably wrong, but it works. Windows.. Wow.
293 if args[0] == 'git-dag':
294 # win32 can't exec python scripts
295 args = [sys.executable] + args
297 if not shell:
298 args[0] = _win32_find_exe(args[0])
300 if PY3:
301 # see comment in start_command()
302 argv = [decode(arg) for arg in args]
303 else:
304 argv = [encode(arg) for arg in args]
306 DETACHED_PROCESS = 0x00000008 # Amazing!
307 return subprocess.Popen(
308 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
309 ).pid
312 def _win32_find_exe(exe):
313 """Find the actual file for a Windows executable.
315 This function goes through the same process that the Windows shell uses to
316 locate an executable, taking into account the PATH and PATHEXT environment
317 variables. This allows us to avoid passing shell=True to subprocess.Popen.
319 For reference, see:
320 https://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
323 # try the argument itself
324 candidates = [exe]
325 # if argument does not have an extension, also try it with each of the
326 # extensions specified in PATHEXT
327 if '.' not in exe:
328 extensions = getenv('PATHEXT', '').split(os.pathsep)
329 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
330 # search the current directory first
331 for candidate in candidates:
332 if exists(candidate):
333 return candidate
334 # if the argument does not include a path separator, search each of the
335 # directories on the PATH
336 if not os.path.dirname(exe):
337 for path in getenv('PATH').split(os.pathsep):
338 if path:
339 for candidate in candidates:
340 full_path = os.path.join(path, candidate)
341 if exists(full_path):
342 return full_path
343 # not found, punt and return the argument unchanged
344 return exe
347 # Portability wrappers
348 if sys.platform in {'win32', 'cygwin'}:
349 fork = _fork_win32
350 else:
351 fork = _fork_posix
354 def _decorator_noop(x):
355 return x
358 def wrap(action, func, decorator=None):
359 """Wrap arguments with `action`, optionally decorate the result"""
360 if decorator is None:
361 decorator = _decorator_noop
363 @functools.wraps(func)
364 def wrapped(*args, **kwargs):
365 return decorator(func(action(*args, **kwargs)))
367 return wrapped
370 def decorate(decorator, func):
371 """Decorate the result of `func` with `action`"""
373 @functools.wraps(func)
374 def decorated(*args, **kwargs):
375 return decorator(func(*args, **kwargs))
377 return decorated
380 def getenv(name, default=None):
381 return decode(os.getenv(name, default))
384 def guess_mimetype(filename):
385 """Robustly guess a filename's mimetype"""
386 mimetype = None
387 try:
388 mimetype = mimetypes.guess_type(filename)[0]
389 except UnicodeEncodeError:
390 mimetype = mimetypes.guess_type(encode(filename))[0]
391 except (TypeError, ValueError):
392 mimetype = mimetypes.guess_type(decode(filename))[0]
393 return mimetype
396 def xopen(path, mode='r', encoding=None):
397 """Open a file with the specified mode and encoding
399 The path is decoded into Unicode on Windows and encoded into bytes on Unix.
401 # pylint: disable=unspecified-encoding
402 return open(mkpath(path, encoding=encoding), mode)
405 def open_append(path, encoding=None):
406 """Open a file for appending in UTF-8 text mode"""
407 return open(mkpath(path, encoding=encoding), 'a', encoding='utf-8')
410 def open_read(path, encoding=None):
411 """Open a file for reading in UTF-8 text mode"""
412 return open(mkpath(path, encoding=encoding), encoding='utf-8')
415 def open_write(path, encoding=None):
416 """Open a file for writing in UTF-8 text mode"""
417 return open(mkpath(path, encoding=encoding), 'w', encoding='utf-8')
420 def print_stdout(msg, linesep='\n'):
421 msg = msg + linesep
422 if PY2:
423 msg = encode(msg, encoding=ENCODING)
424 sys.stdout.write(msg)
427 def print_stderr(msg, linesep='\n'):
428 msg = msg + linesep
429 if PY2:
430 msg = encode(msg, encoding=ENCODING)
431 sys.stderr.write(msg)
434 def error(msg, status=EXIT_FAILURE, linesep='\n'):
435 print_stderr(msg, linesep=linesep)
436 sys.exit(status)
439 @interruptable
440 def node():
441 return platform.node()
444 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
445 chdir = wrap(mkpath, os.chdir)
446 exists = wrap(mkpath, os.path.exists)
447 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
448 if PY2:
449 if hasattr(os, 'getcwdu'):
450 # pylint: disable=no-member
451 getcwd = os.getcwdu
452 else:
453 getcwd = decorate(decode, os.getcwd)
454 else:
455 getcwd = os.getcwd
458 # NOTE: find_executable() is originally from the stdlib, but starting with
459 # python3.7 the stdlib no longer bundles distutils.
460 def _find_executable(executable, path=None):
461 """Tries to find 'executable' in the directories listed in 'path'.
463 A string listing directories separated by 'os.pathsep'; defaults to
464 os.environ['PATH']. Returns the complete filename or None if not found.
466 if path is None:
467 path = os.environ['PATH']
469 paths = path.split(os.pathsep)
470 _, ext = os.path.splitext(executable)
472 if (sys.platform == 'win32') and (ext != '.exe'):
473 executable = executable + '.exe'
475 if not os.path.isfile(executable):
476 for dirname in paths:
477 filename = os.path.join(dirname, executable)
478 if os.path.isfile(filename):
479 # the file exists, we have a shot at spawn working
480 return filename
481 return None
483 return executable
486 def sync():
487 """Force writing of everything to disk. No-op on systems without os.sync()"""
488 if hasattr(os, 'sync'):
489 os.sync()
492 def rename(old, new):
493 """Rename a path. Transform arguments to handle non-ASCII file paths"""
494 os.rename(mkpath(old), mkpath(new))
497 if PY2:
498 find_executable = wrap(mkpath, _find_executable, decorator=decode)
499 else:
500 find_executable = wrap(decode, _find_executable, decorator=decode)
501 isdir = wrap(mkpath, os.path.isdir)
502 isfile = wrap(mkpath, os.path.isfile)
503 islink = wrap(mkpath, os.path.islink)
504 listdir = wrap(mkpath, os.listdir, decorator=decode_seq)
505 makedirs = wrap(mkpath, os.makedirs)
506 try:
507 readlink = wrap(mkpath, os.readlink, decorator=decode)
508 except AttributeError:
510 def _readlink_noop(p):
511 return p
513 readlink = _readlink_noop
515 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
516 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
517 remove = wrap(mkpath, os.remove)
518 stat = wrap(mkpath, os.stat)
519 unlink = wrap(mkpath, os.unlink)
520 if PY2:
521 walk = wrap(mkpath, os.walk)
522 else:
523 walk = os.walk