cmds: update the selected filename when running rebase commands
[git-cola.git] / cola / core.py
blob1485b35ad0262d80b0e1f401f0c45ad1639ee1ad
1 """This module provides core functions for handling Unicode and Unix quirks
3 The @interruptable functions retry when system calls are interrupted,
4 e.g. when python raises an IOError or OSError with errno == EINTR.
5 """
6 import ctypes
7 import functools
8 import itertools
9 import mimetypes
10 import os
11 import platform
12 import subprocess
13 import sys
15 from .decorators import interruptable
16 from .compat import ustr
17 from .compat import PY2
18 from .compat import PY3
19 from .compat import WIN32
21 # /usr/include/stdlib.h
22 # #define EXIT_SUCCESS 0 /* Successful exit status. */
23 # #define EXIT_FAILURE 1 /* Failing exit status. */
24 EXIT_SUCCESS = 0
25 EXIT_FAILURE = 1
27 # /usr/include/sysexits.h
28 # #define EX_USAGE 64 /* command line usage error */
29 # #define EX_NOINPUT 66 /* cannot open input */
30 # #define EX_UNAVAILABLE 69 /* service unavailable */
31 EXIT_USAGE = 64
32 EXIT_NOINPUT = 66
33 EXIT_UNAVAILABLE = 69
35 # Default encoding
36 ENCODING = 'utf-8'
38 # Some files are not in UTF-8; some other aren't in any codification.
39 # Remember that GIT doesn't care about encodings (saves binary data)
40 _encoding_tests = [
41 ENCODING,
42 'iso-8859-15',
43 'windows1252',
44 'ascii',
45 # <-- add encodings here
49 class UStr(ustr):
50 """Unicode string wrapper that remembers its encoding
52 UStr wraps Unicode strings to provide the `encoding` attribute.
53 UStr is used when decoding strings of an unknown encoding.
54 In order to generate patches that contain the original byte sequences,
55 we must preserve the original encoding when calling decode()
56 so that it can later be used when reconstructing the original
57 byte sequences.
59 """
61 def __new__(cls, string, encoding):
62 if isinstance(string, UStr):
63 if encoding != string.encoding:
64 raise ValueError(f'Encoding conflict: {string.encoding} vs. {encoding}')
65 string = ustr(string)
67 obj = ustr.__new__(cls, string)
68 obj.encoding = encoding
69 return obj
72 def decode_maybe(value, encoding, errors='strict'):
73 """Decode a value when the "decode" method exists"""
74 if hasattr(value, 'decode'):
75 result = value.decode(encoding, errors=errors)
76 else:
77 result = value
78 return result
81 def decode(value, encoding=None, errors='strict'):
82 """decode(encoded_string) returns an un-encoded Unicode string"""
83 if value is None:
84 result = None
85 elif isinstance(value, ustr):
86 result = UStr(value, ENCODING)
87 elif encoding == 'bytes':
88 result = value
89 else:
90 result = None
91 if encoding is None:
92 encoding_tests = _encoding_tests
93 else:
94 encoding_tests = itertools.chain([encoding], _encoding_tests)
96 for enc in encoding_tests:
97 try:
98 decoded = value.decode(enc, errors)
99 result = UStr(decoded, enc)
100 break
101 except ValueError:
102 pass
104 if result is None:
105 decoded = value.decode(ENCODING, errors='ignore')
106 result = UStr(decoded, ENCODING)
108 return result
111 def encode(string, encoding=None):
112 """encode(string) returns a byte string encoded to UTF-8"""
113 if not isinstance(string, ustr):
114 return string
115 return string.encode(encoding or ENCODING, 'replace')
118 def mkpath(path, encoding=None):
119 # The Windows API requires Unicode strings regardless of python version
120 if WIN32:
121 return decode(path, encoding=encoding)
122 # UNIX prefers bytes
123 return encode(path, encoding=encoding)
126 def decode_seq(seq, encoding=None):
127 """Decode a sequence of values"""
128 return [decode(x, encoding=encoding) for x in seq]
131 def list2cmdline(cmd):
132 return subprocess.list2cmdline([decode(c) for c in cmd])
135 def read(filename, size=-1, encoding=None, errors='strict'):
136 """Read filename and return contents"""
137 with xopen(filename, 'rb') as fh:
138 return xread(fh, size=size, encoding=encoding, errors=errors)
141 def write(path, contents, encoding=None, append=False):
142 """Writes a Unicode string to a file"""
143 if append:
144 mode = 'ab'
145 else:
146 mode = 'wb'
147 with xopen(path, mode) as fh:
148 return xwrite(fh, contents, encoding=encoding)
151 @interruptable
152 def xread(fh, size=-1, encoding=None, errors='strict'):
153 """Read from a file handle and retry when interrupted"""
154 return decode(fh.read(size), encoding=encoding, errors=errors)
157 @interruptable
158 def xwrite(fh, content, encoding=None):
159 """Write to a file handle and retry when interrupted"""
160 return fh.write(encode(content, encoding=encoding))
163 @interruptable
164 def wait(proc):
165 """Wait on a subprocess and retry when interrupted"""
166 return proc.wait()
169 @interruptable
170 def readline(fh, encoding=None):
171 return decode(fh.readline(), encoding=encoding)
174 @interruptable
175 def start_command(
176 cmd,
177 cwd=None,
178 add_env=None,
179 universal_newlines=False,
180 stdin=subprocess.PIPE,
181 stdout=subprocess.PIPE,
182 no_win32_startupinfo=False,
183 stderr=subprocess.PIPE,
184 **extra,
186 """Start the given command, and return a subprocess object.
188 This provides a simpler interface to the subprocess module.
191 env = extra.pop('env', None)
192 if add_env is not None:
193 env = os.environ.copy()
194 env.update(add_env)
196 # Python3 on windows always goes through list2cmdline() internally inside
197 # of subprocess.py so we must provide Unicode strings here otherwise
198 # Python3 breaks when bytes are provided.
200 # Additionally, the preferred usage on Python3 is to pass Unicode
201 # strings to subprocess. Python will automatically encode into the
202 # default encoding (UTF-8) when it gets Unicode strings.
203 shell = extra.get('shell', False)
204 cmd = prep_for_subprocess(cmd, shell=shell)
206 if WIN32 and cwd == getcwd():
207 # Windows cannot deal with passing a cwd that contains Unicode
208 # but we luckily can pass None when the supplied cwd is the same
209 # as our current directory and get the same effect.
210 # Not doing this causes Unicode encoding errors when launching
211 # the subprocess.
212 cwd = None
214 if PY2 and cwd:
215 cwd = encode(cwd)
217 if WIN32:
218 # If git-cola is invoked on Windows using "start pythonw git-cola",
219 # a console window will briefly flash on the screen each time
220 # git-cola invokes git, which is very annoying. The code below
221 # prevents this by ensuring that any window will be hidden.
222 startupinfo = subprocess.STARTUPINFO()
223 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
224 extra['startupinfo'] = startupinfo
226 if WIN32 and not no_win32_startupinfo:
227 CREATE_NO_WINDOW = 0x08000000
228 extra['creationflags'] = CREATE_NO_WINDOW
230 # Use line buffering when in text/universal_newlines mode,
231 # otherwise use the system default buffer size.
232 bufsize = 1 if universal_newlines else -1
233 return subprocess.Popen(
234 cmd,
235 bufsize=bufsize,
236 stdin=stdin,
237 stdout=stdout,
238 stderr=stderr,
239 cwd=cwd,
240 env=env,
241 universal_newlines=universal_newlines,
242 **extra,
246 def prep_for_subprocess(cmd, shell=False):
247 """Decode on Python3, encode on Python2"""
248 # See the comment in start_command()
249 if shell:
250 if PY3:
251 cmd = decode(cmd)
252 else:
253 cmd = encode(cmd)
254 else:
255 if PY3:
256 cmd = [decode(c) for c in cmd]
257 else:
258 cmd = [encode(c) for c in cmd]
259 return cmd
262 @interruptable
263 def communicate(proc):
264 return proc.communicate()
267 def run_command(cmd, *args, **kwargs):
268 """Run the given command to completion, and return its results.
270 This provides a simpler interface to the subprocess module.
271 The results are formatted as a 3-tuple: (exit_code, output, errors)
272 The other arguments are passed on to start_command().
275 encoding = kwargs.pop('encoding', None)
276 process = start_command(cmd, *args, **kwargs)
277 (output, errors) = communicate(process)
278 output = decode(output, encoding=encoding)
279 errors = decode(errors, encoding=encoding)
280 exit_code = process.returncode
281 return (exit_code, output or UStr('', ENCODING), errors or UStr('', ENCODING))
284 @interruptable
285 def _fork_posix(args, cwd=None, shell=False):
286 """Launch a process in the background."""
287 encoded_args = [encode(arg) for arg in args]
288 return subprocess.Popen(encoded_args, cwd=cwd, shell=shell).pid
291 def _fork_win32(args, cwd=None, shell=False):
292 """Launch a background process using crazy win32 voodoo."""
293 # This is probably wrong, but it works. Windows.. Wow.
294 if args[0] == 'git-dag':
295 # win32 can't exec python scripts
296 args = [sys.executable] + args
298 if not shell:
299 args[0] = _win32_find_exe(args[0])
301 if PY3:
302 # see comment in start_command()
303 argv = [decode(arg) for arg in args]
304 else:
305 argv = [encode(arg) for arg in args]
307 DETACHED_PROCESS = 0x00000008 # Amazing!
308 return subprocess.Popen(
309 argv, cwd=cwd, creationflags=DETACHED_PROCESS, shell=shell
310 ).pid
313 def _win32_find_exe(exe):
314 """Find the actual file for a Windows executable.
316 This function goes through the same process that the Windows shell uses to
317 locate an executable, taking into account the PATH and PATHEXT environment
318 variables. This allows us to avoid passing shell=True to subprocess.Popen.
320 For reference, see:
321 https://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
324 # try the argument itself
325 candidates = [exe]
326 # if argument does not have an extension, also try it with each of the
327 # extensions specified in PATHEXT
328 if '.' not in exe:
329 extensions = getenv('PATHEXT', '').split(os.pathsep)
330 candidates.extend([(exe + ext) for ext in extensions if ext.startswith('.')])
331 # search the current directory first
332 for candidate in candidates:
333 if exists(candidate):
334 return candidate
335 # if the argument does not include a path separator, search each of the
336 # directories on the PATH
337 if not os.path.dirname(exe):
338 for path in getenv('PATH').split(os.pathsep):
339 if path:
340 for candidate in candidates:
341 full_path = os.path.join(path, candidate)
342 if exists(full_path):
343 return full_path
344 # not found, punt and return the argument unchanged
345 return exe
348 # Portability wrappers
349 if sys.platform in {'win32', 'cygwin'}:
350 fork = _fork_win32
351 else:
352 fork = _fork_posix
355 def _decorator_noop(x):
356 return x
359 def wrap(action, func, decorator=None):
360 """Wrap arguments with `action`, optionally decorate the result"""
361 if decorator is None:
362 decorator = _decorator_noop
364 @functools.wraps(func)
365 def wrapped(*args, **kwargs):
366 return decorator(func(action(*args, **kwargs)))
368 return wrapped
371 def decorate(decorator, func):
372 """Decorate the result of `func` with `action`"""
374 @functools.wraps(func)
375 def decorated(*args, **kwargs):
376 return decorator(func(*args, **kwargs))
378 return decorated
381 def getenv(name, default=None):
382 return decode(os.getenv(name, default))
385 def guess_mimetype(filename):
386 """Robustly guess a filename's mimetype"""
387 mimetype = None
388 try:
389 mimetype = mimetypes.guess_type(filename)[0]
390 except UnicodeEncodeError:
391 mimetype = mimetypes.guess_type(encode(filename))[0]
392 except (TypeError, ValueError):
393 mimetype = mimetypes.guess_type(decode(filename))[0]
394 return mimetype
397 def xopen(path, mode='r', encoding=None):
398 """Open a file with the specified mode and encoding
400 The path is decoded into Unicode on Windows and encoded into bytes on Unix.
402 return open(mkpath(path, encoding=encoding), mode)
405 def open_append(path, encoding=None):
406 """Open a file for appending in UTF-8 text mode"""
407 return open(mkpath(path, encoding=encoding), 'a', encoding='utf-8')
410 def open_read(path, encoding=None):
411 """Open a file for reading in UTF-8 text mode"""
412 return open(mkpath(path, encoding=encoding), encoding='utf-8')
415 def open_write(path, encoding=None):
416 """Open a file for writing in UTF-8 text mode"""
417 return open(mkpath(path, encoding=encoding), 'w', encoding='utf-8')
420 def print_stdout(msg, linesep='\n'):
421 msg = msg + linesep
422 if PY2:
423 msg = encode(msg, encoding=ENCODING)
424 sys.stdout.write(msg)
427 def print_stderr(msg, linesep='\n'):
428 msg = msg + linesep
429 if PY2:
430 msg = encode(msg, encoding=ENCODING)
431 sys.stderr.write(msg)
434 def error(msg, status=EXIT_FAILURE, linesep='\n'):
435 print_stderr(msg, linesep=linesep)
436 sys.exit(status)
439 @interruptable
440 def node():
441 return platform.node()
444 abspath = wrap(mkpath, os.path.abspath, decorator=decode)
445 chdir = wrap(mkpath, os.chdir)
446 exists = wrap(mkpath, os.path.exists)
447 expanduser = wrap(encode, os.path.expanduser, decorator=decode)
448 if PY2:
449 if hasattr(os, 'getcwdu'):
450 getcwd = os.getcwdu
451 else:
452 getcwd = decorate(decode, os.getcwd)
453 else:
454 getcwd = os.getcwd
457 # NOTE: find_executable() is originally from the stdlib, but starting with
458 # python3.7 the stdlib no longer bundles distutils.
459 def _find_executable(executable, path=None):
460 """Tries to find 'executable' in the directories listed in 'path'.
462 A string listing directories separated by 'os.pathsep'; defaults to
463 os.environ['PATH']. Returns the complete filename or None if not found.
465 if path is None:
466 path = os.environ['PATH']
468 paths = path.split(os.pathsep)
469 _, ext = os.path.splitext(executable)
471 if (sys.platform == 'win32') and (ext != '.exe'):
472 executable = executable + '.exe'
474 if not os.path.isfile(executable):
475 for dirname in paths:
476 filename = os.path.join(dirname, executable)
477 if os.path.isfile(filename):
478 # the file exists, we have a shot at spawn working
479 return filename
480 return None
482 return executable
485 def _fdatasync(fd):
486 """fdatasync the file descriptor. Returns True on success"""
487 try:
488 os.fdatasync(fd)
489 except OSError:
490 pass
493 def _fsync(fd):
494 """fsync the file descriptor. Returns True on success"""
495 try:
496 os.fsync(fd)
497 except OSError:
498 pass
501 def fsync(fd):
502 """Flush contents to disk using fdatasync() / fsync()"""
503 has_libc_fdatasync = False
504 has_libc_fsync = False
505 has_os_fdatasync = hasattr(os, 'fdatasync')
506 has_os_fsync = hasattr(os, 'fsync')
507 if not has_os_fdatasync and not has_os_fsync:
508 try:
509 libc = ctypes.CDLL('libc.so.6')
510 except OSError:
511 libc = None
512 has_libc_fdatasync = libc and hasattr(libc, 'fdatasync')
513 has_libc_fsync = libc and hasattr(libc, 'fsync')
514 if has_os_fdatasync:
515 _fdatasync(fd)
516 elif has_os_fsync:
517 _fsync(fd)
518 elif has_libc_fdatasync:
519 libc.fdatasync(fd)
520 elif has_libc_fsync:
521 libc.fsync(fd)
524 def rename(old, new):
525 """Rename a path. Transform arguments to handle non-ASCII file paths"""
526 os.rename(mkpath(old), mkpath(new))
529 if PY2:
530 find_executable = wrap(mkpath, _find_executable, decorator=decode)
531 else:
532 find_executable = wrap(decode, _find_executable, decorator=decode)
533 isdir = wrap(mkpath, os.path.isdir)
534 isfile = wrap(mkpath, os.path.isfile)
535 islink = wrap(mkpath, os.path.islink)
536 listdir = wrap(mkpath, os.listdir, decorator=decode_seq)
537 makedirs = wrap(mkpath, os.makedirs)
538 try:
539 readlink = wrap(mkpath, os.readlink, decorator=decode)
540 except AttributeError:
542 def _readlink_noop(p):
543 return p
545 readlink = _readlink_noop
547 realpath = wrap(mkpath, os.path.realpath, decorator=decode)
548 relpath = wrap(mkpath, os.path.relpath, decorator=decode)
549 remove = wrap(mkpath, os.remove)
550 stat = wrap(mkpath, os.stat)
551 unlink = wrap(mkpath, os.unlink)
552 if PY2:
553 walk = wrap(mkpath, os.walk)
554 else:
555 walk = os.walk